分布式锁


Author
|
Earl
Describe
|
该文档列举常见共享数据场景下并发线程安全问题的传统解决方案及其局限性,实现、优化和分析了基于Redis、Zookeeper、Mysql的三种分布式锁,对第三方提供的分布式锁进行了介绍和使用说明
Reference
|
Last Update
|
2024-8-31

 

概述

超卖现象

  1. 概念:

    • 因为多个用户即多个线程并发访问共享库存数据,在不采取上锁等措施情况下因为多个线程的临界区代码执行次序错乱导致丢失部分数据更新操作,最终导致库存扣减小于实际的卖出数量,当库存耗尽前已经售出超过库存数量的商品导致无货可发,这就是超卖现象

缓存穿透

  1. 概念:查询一个一定不存在的数据,默认情况下没有该数据的缓存,由于缓存不命中,请求将会去查询数据库,但是数据库也没有该记录,如果不将此次查询的结果null写入缓存,那么相同的请求每次都会去请求数据库,如果有恶意请求针对不存在商品进行高频攻击,会给数据库造成瞬时高压,可能直接把数据库压垮

  2. 解决办法:

    • 查询查不到结果,就将空结果也进行缓存,并设置一个短暂的过期时间,这样一方面是避免缓存过大,另一个方面是避免空值数据万一有了数据无法及时更新

    • 也可以使用布隆过滤器对高频ip进行封禁

缓存雪崩

  1. 概念:设置缓存时key使用了相同的过期时间,导致缓存再某一时刻同时失效,然而此时的并发请求非常高,瞬间请求压力全部给到数据库,数据库瞬间压力过重雪崩

  2. 解决办法:在原有失效时间上添加一个短时间内的随机值【如1-5min随机】,这样每个缓存的过期时间重复率降低,从而很难发生极短时间内缓存集体失效的情况

缓存击穿

  1. 某些热点数据可能在瞬间突然被超高并发地访问,比如秒杀,但是对应的key正好在大量请求瞬间到来前已经失效,且在超高并发请求到来前没有请求再次形成缓存,那么瞬间的超高并发对同一个key对应的数据查询压力全部落在数据库上,称为缓存击穿,又比如一个接口只缓存一个数据结果,但是这个结果总会失效,失效的瞬间加入还是高并发请求【如首页商品分类数据】,此时所有并发查询压力就会直接加到数据库上

  2. 解决办法:

    • 对重建缓存的过程加双重检查锁,对超高的瞬时并发,只让一个请求通过去重建缓存,剩下的请求都等待缓存

 

数据场景

Mysql



 

  1. 数据场景

    • 共享数据存在于Mysql数据库中,适用于系统并发量小的业务场景,多个线程在一个方法中并发对mysql中的共享数据进行先查后改操作,修改操作为对当前数据减1

       

  2. 测试

    • 测试环境:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求直连单个服务,服务操作虚拟机上的数据库

    • 测试结果:仅操作服务本地缓存数据,没有涉及数据库时吞吐量为5000次/s;添加数据库,单个线程在一个方法内获取一次库存数据,更新一次库存数据,吞吐量减小为2000,数据库库存数据从5000变成4789,5000次扣减库存请求全部成功,理论上数据库库存应减为0,发生超卖现象,出现线程安全问题

 

解决方法


ReentrantLock
  1. 业务逻辑:对查数据库数据和更改数据库数据在服务器层面整体加可重入锁,用无锁并发的方式保证临界区代码原子性,实现对数据库数据的串行更新

  2. 测试

    • 测试环境:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求直连单个服务,服务操作虚拟机上的数据库

    • 测试结果:吞吐量为544,所有库存扣减请求全部成功,数据库库存被正确减为0

    • 结果分析:吞吐量减少相较于修改本地服务内存缓存降低了90%,对数据库访问和加可重入锁对系统并发性能削减很可观,访问数据库导致吞吐量减半,然后加可重入锁导致性能继续降低至访问数据库条件下吞吐量的25%【注意5000的吞吐量是在控制台进行了5000次打印操作,如果没有打印操作将会更高】

  3. 解决方法优缺点

    • ReentrantLockSynchronized都属于JVM本地锁,拥有相同的优缺点,ReentrantLock解决线程安全问题方法的分析见Synchronized解决方法优缺点

     

 

Sychronized
  1. 业务逻辑:对查数据库数据和更改数据库数据在服务器层面整体加Synchronized,用锁独占阻塞的方式来保证临界区代码原子性

  2. 测试

    • 测试环境:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求直连单个服务,服务操作虚拟机上的数据库

    • 测试结果:吞吐量为554,所有库存扣减请求全部成功,数据库库存被正确减为0

    • 结果分析:synchronized和ReentrantLock在这种场景下性能差不多,ReentrantLock只是减少了上下文切换的性能开销

  3. 解决方案优缺点

    • 三种情况可能导致JVM本地锁失效

      • 多例模式可能导致JVM本地锁失效:ReentrantLock和Synchronized一般只能应用在单例对象的方法中来保护对共享成员变量的访问【比如在SpringBoot中的Service通过注入stockMapper来使用同一个单例Mapper操作数据库中的一个共享库存数据】,SpringBoot中默认service就是单例的,因此使用JVM本地锁使用能保证线程安全问题,但是一旦在service类名上通过配置@Scope(value="prototype",proxyMode=ScopedProxyMode.TARGET_CLASS)将service改成多例的,ReentrantLock就可能变成多把锁,Synchronized如果是原本锁当前类对象,导致仍然会有多个线程同时操作共享数据出现线程安全问题,经过测试虽然吞吐量上来了变成了1811【几乎接近没上锁访问数据库】,但是发生了超卖现象,库存数量理论上为0实际上为4850,出现了线程安全问题

      • 使用单体事务注解@Transactional可能导致JVM本地锁失效:在添加JVM本地锁的方法上添加事务注解@Transactional可能导致锁失效,锁失效是有概率的,不一定会失效,但是多线程并发的情况下概率也很高,锁失效的原理是事务注解@Transactional是通过AOP的方式来在方法执行前开启事务,在方法执行结束后提交事务,加上JVM锁以后单个线程的执行逻辑是开启事务--获取锁--操作数据库--释放锁--提交/回滚;如果是多个线程并发就可能在执行流程期间出现线程安全问题,原因是Mysql的默认隔离级别是可重复读,即已经完成的数据库操作还没有提交,查询操作就返回提交前的数据库数据。如果两个线程同时执行同一个上了JVM锁的方法,两个线程都开始事务然后去竞争锁,线程1抢到锁,线程2进入阻塞;线程1执行完数据库操作还没有提交就把锁释放了,因为Mysql的默认事务隔离级别是可重复读,那么在线程1的对数据库操作还没有提交的情况下线程2已经读取了数据库,此时就会读到线程1读取到的还没有被线程1修改的库存数据,当线程2提交的时候就会直接覆盖线程1的修改结果,导致线程1的更新操作全部丢失,发生超卖现象,出现线程安全问题,根本原因是mysql的默认隔离级别是可重复读Repeatable_Read以及JVM提供的锁锁不住AOP实现的由@Transactional在方法开始前开启锁,在方法结束后再提交事务,在释放锁和提交事务期间其他线程就可能读取到还没有被提交的旧数据

        • 🔎:一般数据库的事务隔离级别会设置为RC【读已提交】和RR【可重复读】,但是这两种隔离级别都是读取已经提交后的数据​

      • 集群部署下JVM本地锁失效:JVM本地锁在服务使用集群部署时操作数据库或外置缓存共享数据时也会导致锁失效,原因类似于Ioc组件在多例模式下锁组件本身或者可重入锁是非静态成员变量时锁失效的原因,根本原因还是允许多个线程同时对共享数据进行写操作;使用两台运行实例,使用nginx做负载均衡,吞吐量略微提升至610,发生线程安全问题,出现超卖现象;单台nginx只负载一台运行实例,吞吐量为493,相较于直连运行实例性能稍降降的不多

 

只使用一条SQL
  1. 业务逻辑:

    • 整个方法所有对数据库的增删改查操作都只使用一条SQL,就能避免加事务发生锁失效;

    • 同时因为mysql内部使用悲观锁来保证一条SQL执行的原子性,因此即使是集群部署环境下,也能保证多个线程对数据库中同一个共享数据的线程安全问题;

    • 因为mysql内部保证了一条SQL语句的原子性,因此方法连JVM的锁都不用加

  2. 测试

    • 测试环境:

      • 1️⃣:​100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个数据库的同一个共享数据,查询数据库库存和更新数据库库存两条SQL简化为一条SQLupdate db_stock set count=count-#{count} where product_code=#{product_no} and count >= #{count}

      • 2️⃣:在1️⃣的环境基础上再开启服务组件的多例模式

      • 3️⃣:在1️⃣的环境基础上在服务中使用@Transactional注解手动开启事务

    • 测试结果:

      • 1️⃣:测试1的吞吐量为1953,所有库存扣减请求全部成功,数据库库存被正确减为0

      • 2️⃣:测试2的吞吐量为2224,所有库存扣减请求全部成功,数据库库存被正确减为0

      • 3️⃣:测试3的吞吐量为1850,所有库存扣减请求全部成功,数据库库存被正确减为0

    • 结果分析:一条SQL因为mysql内部加了悲观锁,因此服务中不需要加锁,不需要加事务,集群下也能保证数据库共享数据的操作原子性,性能提升非常可观;不仅能满足集群部署无锁并发线程安全性,还不需要加事务[1],而且在服务组件单例或者多例模式下都能保证共享数据的并发线程安全,不管是锁、事务还是多例都能可观影响系统的并发性能

  3. 解决方案优缺点

    • 优点

      • 能显著提升系统的并发性能,完美解决JVM本地锁三种难以避免的失效场景,还能避免使用锁,事务,多例模式下也是并发线程安全的

    • 缺点

      • Mysql中的锁范围问题,mysql中会根据SQL语句调整锁的范围,如果是使用的表锁就会锁整张表,在并发业务下这种表锁是不可接受的,需要使用更细腻的行级锁来锁操作涉及的相关记录

      • 很难应对复杂的业务场景,比如数据库中经常存在一个商品在多个仓库都有库存的情况,现实业务中往往存在一个仓库无货但是可以从其他仓库调货的情况,但是要使用数据库来做这种业务逻辑判断使用一条SQL实现是比较困难的,对复杂业务很不友好

      • 也因为一条SQL的限制,很难记录共享数据变化前后的准确状态输出到日志中

 

Mysql悲观锁


  1. 业务逻辑:

    • 给整个更新数据库的业务方法添加@Transactional注解添加手动事务

    • 使用SQL语句select * from db_stock where product_code=#{productCode} for update查询库存表所有商品编号为productCode的商品库存记录,并给所有查询出的记录上行级锁,

    • 查询出商品所在所有仓库以后,实际开发中对选用哪一个仓库是要经过很复杂的判断的,比如距离远近、是否还有货等等业务逻辑进行判断, 这里简单地只判断库存数量是否足够,

    • 选中仓库就扣减对应的库存数量

  2. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个数据库的同一个共享数据,使用手动事务,用select ... for update给所有可能操作的记录上行级锁,用后端服务来完成复杂逻辑的运算

    • 测试结果:

      • 1️⃣:测试1的吞吐量为600,所有库存扣减请求全部成功,数据库库存被正确减为0【这里老师简化了仓库选择逻辑,直接扣除第一个仓库5000次库存的】

    • 结果分析:

      • 1️⃣:使用mysql的行级锁的并发性能比JVM本地锁稍微好一点,但是比单条SQL差很多,因为此前是锁一条SQL,一个锁周期内服务器只会和数据库做一次交互,此时为了复杂业务逻辑使用行级锁和手动事务,一个锁周期内服务器会多次和数据库交互

  3. 解决方案优缺点

    • 优点:

      • 相比于一条sql能处理复杂的业务逻辑,可以获取实时的数据进行分析,可以使用代码自己写业务逻辑,也可以使用第三方框架或者大数据框架来对数据进行分析,相比于一条sql获取处理数据更灵活、功能强大的多;行级锁也更好控制

    • 缺点:

      • mysql的悲观锁性能即使是行级锁并发性能只是比JVM本地锁略高,比一条sql的性能要低的多

      • mysql的悲观锁同样存在死锁现象,死锁发生在两个客户端都开启了事务,且都想对对方已经上锁的记录进行上锁,注意,任何客户端都能同时开启事务并尝试对已经上锁的记录加锁,并发情况下容易引起死锁问题,注意Mysql发生死锁会报错DeadLock

      • 一旦表中有一个地方使用了select...for update,其他业务逻辑不能使用普通的select来做业务避免出现并发性的问题

        • 这个存疑,记录被锁住了,其他线程还能操作吗?还是说使用了select...for update如果其他不使用for update上锁的操作不会等待锁会直接失败?

        • 🔑:明白了,是因为虽然线程1使用select...for update上锁了,但是其他业务线程2如果使用普通的select,select操作不会被阻塞,只有更新操作会被阻塞【类似CopyOnWriteArrayList的读写并发】,但是如果其他业务将还没提交的数据读取到,在处理期间上锁的原业务已经提交了,此时做出的更改就是基于线程1上锁前的数据,而非线程1提交后的数据,此时多业务对共享数据的更新操作就会出现线程安全问题,因此使用select...for update所有表下的业务都得使用select...for update

 

Mysql乐观锁


  1. 业务逻辑

    • 在要使用乐观锁的表中添加字段时间戳或者版本号,时间戳只需要使用当前最新时间即可,如果应用程序的时间戳比表中的时间戳旧或者不同,说明在应用程序处理期间发生了更新操作,就需要拿着新值对应用程序进行重试,此时数据库返回的结果是更新操作影响的行数是0,因为时间戳或者版本号对不上了,此时再次完整执行业务方法直到更新记录条数为1的情况下说明操作是线程安全的,jdbc的update相关方法的返回值就是本次更新影响的记录条数【Mybatis和MP中的也是一样的】,老师的重试使用的递归,后面测试两个服务都发生了栈内存溢出

    • 更新成功需要把时间戳改成最后更新的时间,sql应该满足update 表名 set 目标字段=具体值,时间戳=当前系统时间 where 条件字段=具体值 and 时间戳=应用程序提前获取的时间戳

    • 使用版本号和使用时间戳的逻辑是相同的,只是版本号要控制像时间戳一样单增,使用时间戳有个问题是时间精度的问题,如果只精确到毫秒时间间隔可能太短,这个还需要进行验证,并没有可靠的案例支持,使用版本号不存在这方面顾虑,update 表名 set 目标字段=具体值,版本号=版本号+1 where 条件字段=具体值 and 版本号=应用程序提前获取的版本号

  2. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个数据库的同一个共享数据,使用手动事务,用版本号字段作为乐观锁状态位,在mysql中一行sql保证查询版本号和更新版本号的原子性,在应用程序中实现cas操作失败无锁重试,在方法上添加了事务注解,重试递归调用了同一个方法

      • 2️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个数据库的同一个共享数据,使用手动事务,用版本号字段作为乐观锁状态位,在mysql中一行sql保证查询版本号和更新版本号的原子性,在应用程序中实现cas操作失败无锁重试,在方法上移除事务注解,仍然使用递归调用方法,但是每次尝试sleep(20)阻塞睡眠20ms

    • 测试结果

      • 1️⃣:吞吐量48,错误率飙升至68%,两个服务都出现栈内存溢出,不断报错,最后抛出异常SQLTransientConnectionException数据库连接超时异常

      • 2️⃣:吞吐量229,且吞吐量越来越小,所有库存扣减请求全部成功,数据库库存被正确减为0

    • 结果分析

      • 1️⃣:无锁并发重试递归调用方法存在栈内存溢出的风险,而且并发度越高,竞争越激烈,栈内存溢出风险也越高;此外方法内进行CAS无锁重试,如果竞争太激烈更新失败会导致一直在重试,但是加了事务注解手动控制事务,在事务未提交以前,数据库的DML操作会给对应的记录加行级锁或者给整个表加表锁,此时后面的重试发起的sql操作是被阻塞等待的,根本不能成功,阻塞超过默认30s就会报SQL连接超时异常,把手动事务删掉,自动事务只会加在单条DML操作上,执行失败悲观锁会立即释放掉,因此重试时不会发生阻塞等待的问题;至于栈内存溢出是递归调用方法将方法加载到栈内存导致的,这里老师的解决方法是每次重试都间隔20ms,这特么也有点离谱,都间隔20ms也是并发啊,感觉使用do...while比较合理,不去循环加载方法区代码,老师这里只是减少了对递归方法的调用次数来减少栈内存溢出的概率,感觉不是很可靠

      • 2️⃣:吞吐量越来越小是因为竞争越来越激烈,在不加事务注解对重试进行阻塞,对重试间隔控制在20ms的递归方法调用的情况下所有请求都成功,数据库也不会发生线程安全问题,这种乐观锁重试大量无用的对数据库访问严重影响系统性能,效果甚至不如JVM本地锁

  3. 解决方案优缺点

    • 优点

      • 乐观锁不会导致死锁,但是悲观锁是有一定概率导致死锁的

    • 缺点

      • 高并发情况下,性能极低,因为存在大量的重试访问数据库

      • 乐观锁如果使用无序改动数据比如上面的库存数据作为判断是否可以进行CAS操作的判据就会有ABA问题,ABA问题即,在正式进行cas操作前,cas操作虽然成功,但是在获取旧值和执行cas操作期间旧值如果被修改为值B和其他值,但是执行CAS操作前又改回了A值,CAS操作本身是无法判断出来的,其他业务可能使用中间值就执行其他业务了,这里使用时间戳或者版本号单增不会出现这个问题

      • 读写分离情况下可能导致乐观锁不可靠,读写分离一般会搭建主从集群,采用主从复制的形式,让写操作去主集群,让读取操作去从集群中读取数据

        • 🔎:主集群发生写操作会把写操作记录到自己的binlog日志中,从集群会不停地从binlog日志中拉取日志并记录到从集群的relaylog中继日志中,从集群读取中继日志replay把主集群动作SQL重演一次,对从集群执行相同的写操作,这个过程延迟比较大,记录binlog日志、将binlog读取出来发送给从集群、写入relay日志,从relay日志读取进行replay一共四次IO外加一次网络IO导致延迟比较大,因此主从分离系统同步的延迟就较大

        • 🔎:主从分离在高并发情况下就非常容易导致主集群中写操作已经发生,但是从集群还没有同步完成,导致读取不到已经写入主集群的新数据,此时使用乐观锁就不合适,本来更新操作已经发生,但是因为从集群有延迟,cas操作读取从集群数据,但是因为延迟读取不到新值只能读到旧值,导致CAS操作成功,丢失上一次修改,乐观锁失去了作用

         

 

 

 

Mysql悲观锁范围

  1. 概念

    • 思考mysql的悲观锁默认是行锁还是表锁

  2. 测试

    • 测试环境

      • 1️⃣:两个mysql本地客户端操作同一个mysql数据库,在客户端1开始事务,使用更新语句update db_stock set count=count-1 where product_code='1001' and count>=0;更新数据库数据,然后客户端2使用更新语句update db_stock set count=count-1 where id=3;

    • 测试结果

      • 1️⃣:客户端1开启事务执行更新语句后,客户端2执行更新语句更新相同表与客户端1更新数据不相关的记录,客户端2进入阻塞等待,等待客户端1

    • 结果分析

      • 1️⃣:mysql客户端的事务默认是锁整张表,是表锁。一次更新操作期间别的SQL是无法对同一张表的任何数据进行操作的;mysql中悲观锁使用行级锁的条件是数据的查询或者更新条件必须是索引字段[1],同时查询或者更新条件必须是具体值,如果是模糊查询就会导致索引失效或者定位条件使用字段!=具体值,这两种情况悲观锁还是会使用表级锁,像=和in都会使用行级锁

  3. 备注

    • mysql手动事务需要使用begin;开始事务才会使用对应的锁来实现在提交事务commit;前锁住数据库中正在被操作的目标记录,注意这期间使用查询SQL是不会给记录上锁的,只有写操作和select...for update才会给对应的记录或者整张表上锁,select...for update上行级锁也遵循上面结果分析的规则;应用中加@Transactional注解也是使用手动事务,相当于mysql中使用手动事务;不使用手动事务的情况下mysql会对每行sql操作使用自动事务

     

     

Redis

  1. 数据场景

    • 共享数据存在于Redis数据库中,多个线程在一个方法中并发对Redis中的共享数据进行先查后改操作,修改操作为对当前数据减1

       

  2. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,没有任何锁或者保护线程安全措施

      • 2️⃣:相同条件再测试一次

    • 测试结果:

      • 1️⃣:吞吐量2047,5000次扣减操作全部成功,但是Redis中库存数据应从5000减至0,实际是3704,出现超卖现象

      • 2️⃣:吞吐量3268,5000次扣减操作全部成功,但是Redis中库存数据应从5000减至0,实际是3702,出现超卖现象

    • 结果分析

      • 1️⃣:出现线程安全问题,第二次测试的吞吐量相较于第一次性能提升非常大,这是因为第一次运行系统进行了预热

 

解决方法

JVM本地锁


 

Redis乐观锁


  1. Redis事务

    • watch key [key...]可以监控一个或者多个列举变量的变量值,搭配multiexec一起使用,Redis的事务提交是一旦在开启事务multi还没提交执行期间,如果watch指令监听的变量被其他客户端更改了,本次开启事务还未提交exec的指令将全部作废无法提交,exec指令执行会返回nil,会取消事务的执行

    • RedisTemplate对象和StringRedisTemplate对象都有对应的watch(key)watch(keys)multi()exec()方法,但是想像在客户端中使用指令一样使用这些方法是不行的,运行方法会直接报错RedisCommandExecutionExceptionRedis指令执行异常,抛异常的原因是服务器端使用这几个指令的方法不对,不能像直接在redis-cli中使用指令一样使用这几个方法,需要使用redisTemplate.execute(SessionCallback<T> session)

    • redisTemplate.execute(SessionCallback<T> session)中的SessionCallback<T>是一个函数式接口,在该接口的注释中标明允许替代用multi/discard/exec/watch/unwatch命令来使用事务,在服务端必须通过该对象来使用事务相关的指令,弹幕说是因为要绑定会话连接和当前线程;使用redisTemplate.execute(SessionCallback<T> session)方法需要匿名实现函数式接口SessionCallback<T>,需要重写其中的抽象方法execute(RedisCallback<T> action)并在该方法中调用RedisTemplate对象和StringRedisTemplate对象对应的watch(key)watch(keys)multi()exec()方法,但是不推荐直接通过RedisTemplate对象和StringRedisTemplate对象对这些方法进行调用,因为抽象方法的传参RedisCallback<T>是一个接口,RedisTemplate是其实现类,而StringRedisTemplateRedisTemplate的子类,因此实际上该抽象方法传参就是容器组件中的StringRedisTemplate,直接通过该参数action来调用Redis事务相关的方法watch(key)watch(keys)multi()exec()

    • 本地客户端用法

      【redis-cli客户端1】

      【redis-cli客户端2】

    • 在服务端使用Redis事务的代码正确示例

      【错误示例】

      • 不要直接在自定义方法中使用watchmultiexec方法,这种调用方式会直接在服务端抛RedisCommandExecutionExceptionRedis指令执行异常

  2. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用指令watchmultiexec来实现乐观锁控制多线程并发安全

    • 测试结果:

      • 1️⃣:吞吐量370,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

    • 结果分析

      • 1️⃣:Redis乐观锁能够解决并发线程安全问题,但是性能相较无锁情况下损失非常高,从原来的3000直接降成370

  3. 解决方案优缺点

    • 缺点:

      • Redis乐观锁的性能低,使用Jedis客户端能稍微提高性能,但是性能损耗依然很大,因此不推荐使用redis乐观锁

      • 使用Redis乐观锁过程中可能因为机器性能问题出现连接不够用的情况导致乐观锁失效

     

基于Redis实现分布式锁

 

基础实现



 

  1. 获取分布式锁的服务端代码示例

    • 递归重试代码示例

    • 循环重试代码示例

    • 测试

      • 测试环境:

        • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉递归重试代码示例实现的基于Redis的乐观锁对库存数量5000进行单次扣减1,累计5000次扣减请求

        • 2️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉循环重试代码示例实现的基于Redis的乐观锁对库存数量5000进行单次扣减1,累计5000次扣减请求

      • 测试结果:

        • 1️⃣:吞吐量549,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

        • 2️⃣:吞吐量605,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

      • 结果分析:

        • 1️⃣:吞吐量和JVM本地锁操作mysql中的共享数据差不多,效率还是低,感觉循环重试的网络开销才是影响性能的主要大头,老师自己都说递归调用这种行为不好存在爆栈风险,此前mysql的递归调用是没有改的,最好还是使用循环来重试获取锁

        • 2️⃣:基于Redis乐观锁的实现循环调用的性能比递归调用的性能略高

防宕机死锁



 

  1. 使用基于Redis实现的乐观分布式锁的关注要点

    • 1️⃣:不管是循环重试还是递归重试获取锁,都要控制重试的时间间隔,否则连接资源开销太大可能会压垮服务器,但是重试间隔时间太久也会压低系统性能,这个取间隔时间的标准要好好关注一下;

      • 🔎:老师对睡眠时间的理解是设置合适的睡眠时间能降低线程对锁的整体竞争压力,反而能提高使用乐观锁的系统性能,但是没有提及如何选取一个合适的间隔时间

      • :而且没有考虑最大重试时间或者次数的问题,一旦出现问题根本无法获取锁,所有请求就会一直进行递归或者循环重试,实际应用中都需要根据业务需求在原理的基础上重新设计

    • 2️⃣:此外下面的实现都是所有线程获取不到锁都会重试直到获取锁,这种方式适合解决超卖应用场景让每个扣减请求都成功正确扣减;对于类似重建分布式缓存这种场景只需要重建一次缓存,其余抢不到锁的请求阻塞等待,重建失败其余等待请求再次重试,缓存重建成功其余请求不再重试需要在该原理的基础上进一步修改,这个实际上应该在业务方法中自定义获取不到锁的行为,循环重试和递归重试都是业务方法的行为,分布式锁设计不需要考虑这一点,直接在业务方法中通过双重检查锁实现一次构建缓存即可

    • 3️⃣:这种基于Redis实现的分布式锁还要注意防死锁的发生,因为锁是从第三方获取并通过指令远程操纵第三方对锁进行释放的,如果服务器【服务器此时也可以叫做redis客户端程序】在获取锁执行业务代码期间宕机了,即使释放锁的操作写在finally语句块中也是无法被执行的【即使该服务器恢复了代码也无法继续向下执行释放锁的代码】,那么此时锁就变成死锁永远得不到释放了,要解决这个问题可以给锁对应的键值对设置一个合适的过期时间;

      • 🔎:Redis中提供指令EXPIRE <key> <seconds>来为指定key的键值对设置秒级别的过期时间,还提供了指令PEXPIRE <key> <milliseconds>来为指定key的键值对设置毫秒级别的过期时间,使用指令ttl <key>能查看锁对应键值对的过期时间【键值对过期以后ttl <key>指令的返回值为-2】。锁一旦过期就会自动被释放掉

      • 🔎:给锁相关键值对设置有效时间的代码不能放在获取锁以后分步来执行,因为可能会发生第一次执行获取锁时,客户端程序还没来得及给锁对应键值对设置有效时间就宕机了,那么在这种情况下仍然会发生死锁现象,需要保证获取锁和设置过期时间操作的原子性,此时我们可以考虑使用复杂的set指令来通过一条指令同时实现获取锁和设置过期时间的目标;redis和mysql一样能保证一条指令执行的原子性

      • 🔎:Redis中set指令的完整格式为set <key> <value> [Ex seconds] [Px milliseconds] [NX|XX],Ex表示设置秒级别的过期时间,Px表示设置毫秒级别的过期时间,NX表示当键值对不存在时该指令能成功执行,XX表示当键值对存在时该指令才能成功执行覆盖对应key的value值;指令set lock 111 ex 20 nx的意思是当redis中以lock作为key的键值对不存在时将键值对<"lock","111">存入redis中并设置键值对的过期时间为20s;这条指令对应封装在setIfAbsent(String key,String value)的重载方法setIfAbsent(String key,String value,long timeout,TimeUnit unit)中,循环重试加锁并原子性给锁设置有效时间的完整代码示例如下所示:

      • 🔎:设置了过期时间也可能带来一系列问题,第一个问题是假如线程1获取了锁且给锁上了有效时间,但是线程1可能因为其他原因一直被阻塞或者锁的过期时间太短【比如锁的有效时间是3s,但是执行业务方法就需要5s或者重试次数太多,可能执行过程中redis中的锁就被自动释放掉了】,等到锁过期了继续执行后续的业务代码,此时线程1就处于无锁裸奔的情况,有可能会发生线程安全问题;除此以外后续上锁的请求线程如线程2上的锁还有可能直接被前一个提前自动释放锁的线程1执行完业务方法调用delete方法直接手动将线程1的锁给释放掉了,导致后续的连续出错,而且后续请求执行时间大于锁的有效时间也会继续自动释放,最终所有请求的执行都处于无锁裸奔状态,此时有锁和无锁就没啥区别了,相当于锁失效;此时有两个问题,第一个是一个线程可能会去释放另外一个线程的锁,第二个问题是锁可能会在业务方法执行结束前提前失效,因此还需要实现防非当前线程释放当前线程的锁,以及业务方法没执行结束锁过期让锁自动续期

         

正确释放锁



  1. 防止非当前线程释放当前线程的锁,可以给当前线程上的锁加一个唯一标识,要实现每个线程锁唯一还要被所有线程抢占同一把锁可以让key相同,让对应的value作为当前线程的唯一标识【可以使用UUID作为该标识】;把释放锁的逻辑改成释放锁前先去判断一下锁是否是当前线程获取锁时的value,value只有和自己锁时相同才能由当前线程去释放锁,如果不相同则无需再释放锁,示例代码如下

    • 🔎:上述代码还是存在问题,因为没有保证删除该锁和判断锁的value是自己设置的两个过程是原子性的,在获取锁和删除锁期间还是有可能锁过期其他线程上锁然后被当前线程调用delete方法删除其他线程上的锁,导致下一个请求无锁裸奔,可能出现并发线程安全问题,Redis中没有指令既能判断键值对还能同时删除对应键值对的,只能借助Lua脚本来进行实现

  2. 使用Lua脚本来保证上述释放锁查询判断value值和当前线程获取锁生成的value值相同并删除对应锁两个操作的原子性

    • 使用lua脚本可以保证Redis多条指令原子性的原理是Redis客户端程序通过lua脚本把多个Redis指令一次性发送给Redis服务器,那么这些指令就不会被其他客户端指令打断。Redis的单线程设计也会保证脚本以原子性的方式执行即当某个脚本正在运行的时候,不会有其他脚本或Redis命令被执行。关于Lua语言的语法和Redis中使用Lua脚本执行Redis命令的知识请查看后端--Lua,这里只列举对应的java程序实现

    • RedisTemplate或者StringRedisTemplate对象中的execute方法有一个重载方法execute(RedisScript<T> script,List<String>) keys,Object... args)可以传递脚本scriptKEYS列表ARVG列表

      • 🔎execute方法的第一个参数RedisScript类型是一个接口且非函数式接口,该接口只有一个实现DefaultRedisScript,因此第一个参数需要通过构造方法传参不带换行的lua脚本即scriptnew DefaultRedisScript(script)将lua脚本封装成DefaultRedisScript,但是特别注意,如果使用该构造方法new DefaultRedisScript(script)封装lua脚本运行会直接抛异常UnsupportedOperationException,必须使用重载的双参构造方法指定返回值类型即new DefaultRedisScript(String script,Class<Object> resultType)指定返回值类型如new DefaultRedisScript(script,Boolean.class),这个返回值就是lua脚本输出在redis-cli控制台的返回值,这个返回值类型随便瞎写也不会报错,但是不指定返回值类型下列程序运行就会直接抛异常,注意new DefaultRedisScript<>(script,Boolean.class),这个尖括号要求的也是返回值的类型,第二个参数指定了该尖括号就不需要指定了,如果指定要保证两个类型要一致

    • 基于Redis和lua脚本实现的分布式锁的服务器端Java代码

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉基于Redis和lua脚本实现的分布式锁的服务器端Java代码示例实现的基于Redis的乐观锁对库存数量5000进行单次扣减1累计,使用uuid作为键值对的value,使用lua脚本来检查锁是否属于当前线程并删除键值对释放锁

    • 测试结果

      • 1️⃣:吞吐量650,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

 

锁重入实现



ReentrantLock原理

  1. 解决分布式锁不可重入导致死锁的问题

    • 原理:两个方法可能都需要占有同一把锁,当一个方法1占有锁以后调用另一个方法2,此时就会发生锁重入,如果锁设计时不支持可重入,那么就会发生方法1等待方法2执行结束以后才能释放锁,但是方法2需要方法1释放锁以后才能获取锁执行,此时就会发生死锁现象,因此在必要的情况下还需要设计分布式锁的可重入来避免发生死锁,分布式锁的可重入思想可以参考ReentrantLock的锁重入设计

    • ReentrantLock原理:这个黑马JUC把源码讲的很清楚了,看笔记就行,这里只记录要点

      • 🔎:ReentrantLock的默认无参构造方法是构造非公平实现的同步器,构造方法传参true是构造公平实现,传参false也是构造非公平实现,公平和非公平的区别体现在新线程是否要等队列中的节点对应线程都释放锁以后才有资格抢占锁【公平实现】还是在进入队列以前不管队列中有无节点都可以首先尝试获取锁【非公平实现】,同步器管理控制锁的行为,同步器继承抽象父类AQS,AQS是一切Java层面实现的锁【独占锁ReentrantLock、共享锁ReentrantReadWriteLock、Semaphore、CountdownLatch】的基础,AQS底层主要由int类型的state属性、int类型的waitStatus、从AQS的父类继承来的Thread类型的exclusiveOwnerThread属性和一个FIFO队列构成;AQS中线程上锁实质是多线程并发调用CompareAndSetState方法使用cas操作更改state属性的状态,在CompareAndSetState方法中调用UNSAFE类【Unsafe类可以通过类对象和属性相对于对象的内存偏移量直接对属性内存进行操作,里面提供了大量硬件级别的CAS原子性操作】的compareAndSwapInt方法,这里面的偏移量的计算在static静态代码块中,在类加载的时候就计算完毕了,Unsafe对象的获取方法上加了注解@CallerSensitive注解,只能在JDK的源码中使用【严重怀疑这里只是说AQS中获取的unsafe对象只能被JDK使用,具体回顾一下JUC中对Unsafe类的使用】,在用户自己的项目里面是使用不了的

      • 🔎:AQS类中有2000多行代码,大部分的同步器功能都已经被实现好了,用户可以基于AQS实现自定义独占锁和共享锁,要实现独占锁需要重写AQS中的tryAcquire(int)方法【该方法在AQS中直接抛异常,即子类不实现该方法上锁时就会直接抛UnsupportedOperationException异常】和tryRelease(int)方法;要实现共享锁需要子类重写tryAcquireShared(int)方法和tryReleaseShared(int)方法;ReentrantLock方法的同步器Sync只实现了tryRelease方法,tryAcquire方法由其非公平子类进行实现【非公平子类实现NonfairSync实际上只重写了tryAcquire方法和lock方法】

      • ReentrantLock锁重入的最重要的逻辑是,如果state等于0就直接cas操作加锁;如果state不等于0就判断当前线程是否占有锁的线程,如果是state加1【即重入次数加1】,否则进入阻塞队列阻塞;释放锁时让state减1,判断当前线程是否是锁的持有者,判断新state值是否为0,为0表示锁解干净了,设置owner为null,将state设置为新值并返回true,后续根据true唤醒享元下一个节点;如果不为0表示锁还没释放干净,将state设置为新值并返回false

    • ReentrantLock方法的上锁流程

      • 1️⃣reentrantLock.lock():用户调用ReentrantLock的上锁方法

      • 2️⃣nonfairSync.lock()ReentrantLock方法默认调用的是非公平实现的lock方法

      • 3️⃣AQS.acquire():非公平实现的lock调用的是同步器的继承自抽象父类AQS的acquire方法,在AQS的acquire方法中调用了非公平实现的tryAcquire方法尝试获取锁,在tryAcquire方法执行失败的情况下执行acquireQueued(addWaiter(Node.EXCLUSIVE),arg)方法来创建获取锁失败的线程对应的节点,当AQS队列没有节点的情况下再尝试两次获取锁,如果还是获取不到,节点入AQS队列,当前线程使用LockSupport的park方法自我进行阻塞

      • 4️⃣nonfairSync.tryAcquire():非公平实现的上锁方法tryAcquire方法又调用了从父类Sync中继承来的nonfairTryAcquire方法

      • 5️⃣sync.nonfairTryAcquire(int acquires):acquires的值实际上为1,在该方法中获取state的值,如果state等于0使用cas操作将其改为1,如果成功表示上锁成功返回true方法结束,如果state不为0就检查当前线程是否为此前占有锁的线程,如果不是直接返回false返回进入阻塞等待的逻辑,如果是当前占有锁的线程说明发生锁重入,将state状态加1,此时也只有当前线程能运行到此处,因此这个更新state状态无需使用cas操作,重入成功直接返回true方法结束

      • 6️⃣AQS.addWaiter(Node mode):该方法就是准备入队列前的准备方法了,首先创建一个关联当前线程的Node节点,检查AQS队列中的尾结点是否为null,如果不为null就使用cas操作将当前节点更改为尾节点,把当前节点连到尾结点上并返回node节点;如果尾结点为null说明队列中没有节点,就调用enq(node)方法,然后返回node节点

      • 7️⃣AQS.enq(final Node node):如果尾节点为空,说明AQS队列中没有等待的线程,但是当前线程因为竞争获取不到锁仍然要入队列,进入该方法是一个死循环,此时获取尾节点检查尾节点是否为null【这一步实际上是检查AQS队列的享元是否创建,没有享元就创建享元】,如果为null就创建一个不关联任何线程的Node节点【实际上是代表当前占有锁的线程,因为后面会根据享元的waitStatus,当前占有锁的线程来判断是否要唤醒后一个节点】,如果享元不为null就会在第二次循环中将当前线程对应的节点加到享元后面,并让享元的next指向当前节点,让当前节点的prev指向享元【所以AQS板上钉钉使用了双向链表结构】 ,让节点加入队列以后返回当前节点

    • ReentrantLock的解锁流程

      • 1️⃣reentrantLock.unlock():用户调用ReentrantLock的解锁方法

      • 2️⃣nonfairSync.release(1)ReentrantLock方法默认调用的是非公平实现的release方法,但是非公平实现中只有lock方法和tryAcquire方法,release方法也是从AQS中继承来的方法,而且release方法中的参数列表1是写死的,该方法中首先调用tryRelease(1)方法尝试解锁,解锁成功【只有包括重入的锁完全释放才会返回true】检查AQS队列的享元是否存在,存在的情况下waitStatus是否不为0,不为0就需要调用unparkSuccessor(h)方法唤醒享元的后继节点,该方法解锁成功就返回true,解锁失败就返回false

      • 3️⃣nonfairSync.tryRelease():tryRelease方法对应基于AQS实现的独占锁的tryAcquire方法,两个方法都抛出UnsupportedOperationException异常,都需要被子类同步器重写,可重入锁的tryRelease方法是由Sync而非NonFairSync实现的,这点与tryAcquire方法不同,这是因为公平锁和非公平锁的解锁流程是相同的,但是上锁时公平锁需要检查AQS队列中是否有节点,非公平锁不需要检查而是首先进行竞争的差异导致的;tryRelease方法首先会获取state的值并在值的基础上减1,判断当前线程是否锁的占有线程,不是直接抛异常IllegalMonitorStateException,如果锁的占有者是当前线程,判断state是否等于0,如果是说明锁释放干净了将exclusiveOwnerThread设置为null,然后将state设置为新值,注意只有锁释放干净了该方法才会将exclusiveOwnerThread设置为null并返回true,只有在返回true的情况下才会去尝试唤醒享元后面的节点

可重入分布式锁

  1. 参考ReentrantLock实现非公平的分布式可重入锁的lua脚本上锁和释放锁逻辑

    • 🔎:通过Redis中唯一的key和setnx指令来实现独占锁,通过value使用uuid作为当前线程的唯一标识,

    • 🔎:如何在Redis中保存锁的重入状态,可以考虑在UUID后面加个计数,但是这种方式实现起来比较复杂,在Redis中有一种数据结构叫Hash,Hash是一个键值对集合,key就是正常的key,value是一个String类型的field和value的映射表,即Java中的Map,value可以有很多个键值对,哈希特别适合存储对象,类似于Java中的Map<String,Object>,很像一个对象,key就是对象比如user对象的名字,有个字段叫age,值为20;还有个字段是银行余额balance,值为5000;即Hash可以认为是一个双层Map,也可以认为是一个对象

      • 使用命令hset user name '柳岩'插入Hash类型的数据到Redis中,对应的要实现基于Redis分布式锁可以将锁的数据封装成hset lock uuid 锁的重入次数,即将基于Redis的分布式设计为使用hash数据类型+lua脚本+乐观锁机制

    • 加锁实现逻辑如下,加锁、重入成功都返回1,加锁、重入失败都返回0,对应Java中的true和false

      • 1️⃣:通过Redis指令EXISTS <key>判断键值对是否存在线程占有锁,如果存在则返回1,不存在则返回0;

      • 2️⃣:如果返回0说明没有线程占用锁,当前线程可以直接通过Redis指令hset <key> <field> 1获取锁;通过Redis指令expire <key> <time>为锁设置有效时间防死锁

        • 🔎:ttl为-1表示当前键值对没有设置有效时间

      • 3️⃣:如果返回1说明有线程占用锁,当前线程使用Redis指令HEXISTS <key> <field>判断Redis中的锁的field是否当前线程的uuid,如果是返回1,如果不是返回0;

        • 如果是当前线程的uuid,说明发生锁重入,此时使用Redis指令hincrby <key> <field> <increment>将value值递增increment,然后使用Redis指令expire <key> <time>将锁的有效时间进行重置

        • 如果不是当前线程的uuid,说明锁已经被其他线程占有,此时线程应该进入阻塞,但是这个实现工作量太大,使用循环尝试或者递归重试加间隔时间代替,不亚于实现一个ReentrantLock,可以考虑直接在Java代码中使用ReentrantLock来阻塞唤醒,减少重试竞争

    • 对应的lua加锁脚本

      • 🔎:注意如果redis中没有对应的键值对,经过测试此时直接使用hexists lock uuid 1和命令hset lock uuid 1的效果是相同的,都是将该键值对加入hset中且value值为1,故下面的lua代码可以合并简化

      【合并简化的lua代码】

      • 🔎:lua的逻辑运算符只有andornot

      • 🔎:所有参数都使用传参的KEYS列表和ARVG列表传参,增强lua代码的复用性;键值对的key用KEYS列表,uuid和有效时间用ARGV列表传递

    • 解锁逻辑如下,如果锁不存在lua代码中返回nil,对应java代码中的null,可以在java代码中手动抛出异常

      • 1️⃣:使用Redis指令hexists判断当前线程获取的锁是否存在,该指令不存在返回0,存在返回1

      • 2️⃣:如果锁不存在就直接返回nil,由Java层面直接抛异常

      • 3️⃣:如果锁存在直接使用Redis指令hincrby <key> uuid -1将value值减1,然后判断减1后的value值是否为0,如果为0表示锁释放干净了直接使用Redis指令del <key>删除对应键值对返回1;不为0表示锁还没有释放干净,直接返回0

        • 🔎:注意指令hincrby <key> uuid -1的返回值就是value减去1后的值

    • 对应的lua释放锁脚本

  2. 使用Java在服务端远程操作Redis通过Redis、Lua脚本、乐观锁机制实现的可重入分布式锁

    • 实现逻辑:

      • 这个实现没有做基于AQS的阻塞唤醒,应该是可以在Java层面做到线程的阻塞唤醒的

      • 创建分布式锁类DistributedRedisLock,该类实现Lock接口,分布式锁只需要关注上锁和解锁方法,

        • 上锁方法只需要实现代参数的tryLock方法,不带参数的tryLock方法可以传参过期时间-1表示默认过期时间30s的锁,lock方法直接调用tryLock方法

        • 🔎:要使用自动注入注解来使用IOC容器组件的类也必须纳入IOC容器的管理才行,简单点添加@Component注解,以后使用就可以注入DistributedRedisLock直接使用,但是这样还是不够好,因为对于不同的分布式锁实现需要注入不同的组件,我们可以通过工厂设计模式来创建对应的实例对象,这样可以在工厂对象中使用IoC容器组件,分布式锁的具体实现不注入容器中,这样还能节省内存

        • 重新设计Redis中锁的键值对Hset的field字段,让uuid作为服务的标识,让获取锁时以线程id作为锁的标识,用每个服务唯一的uuid:线程id作为field字段;field字段用于解决分布式环境下的锁的可重入实现同一个线程对同一把锁多次上锁,让获取分布式锁的标准只判断指定key的HSet是否存在,如果不存在则直接上锁,如果存在则根据当前线程的id和服务的uuid是否和当前线程一致,一致说明是锁重入进入锁重入逻辑;如果不一致说明指定key的锁发生竞争,当前线程进入等待重试

          • 🔎:这里uuid:线程id只是用来判断是否发生锁重入,不是用来判断锁是否被占有,锁被占有的判据是key,所以不会存在因为线程的id不同导致filed字段不同导致相同的key不同的field发生锁不住的情况,即key相同,但是field不同的线程无法锁重入,对应key的HSet存在,因此上不了锁,等待重试

          • 🔎:此外,因为是以key对应的Hset作为锁,因此只要key不同,锁就不同,同一个线程要上多把锁可以根据Hset的key为区别区分不同的锁,即使这些锁的field是一样的但是他们的key不一样也是不同的锁,因此一个线程是可以上多把锁的

      【基于Redis的可重入失败的分布式锁实现】

      • 🔎:这里的上锁实现是执行lock方法首先去获取锁,但是一获取锁就去创建新的锁对象并创建新的uuid,即只要在不同方法中调用了distributedLockClient.getRedisLock("lock")后续即使调用lock方法内部方法应该是锁重入也会因为上的是不同的锁导致锁重入失败,这是因为无法在定义方法时就确定方法调用顺序的前提下就要区分方法调用导致的锁重入使用同一把锁引起的,因此还需要在锁内部设计在获取锁时判断外层方法是否有锁;判断方法被同一个线程调用的最好的标识就是Thread的id,通过检查有没有当前线程对应的key和相同的field就知道调用者是否已经对同一把锁上锁

        【分布式锁实现】

        【分布式锁工厂类实现】

        【业务类对锁的使用】

      【基于Redis的可重入的分布式锁实现】

      • 🔎:在上面实现的基础上更改表示锁的Redis中的HSet的filed字段,将field字段从uuid改成了uuid:线程id;让每次获取锁都获取全新的锁变成了每个服务对应一个uuid,同一个线程获取同一把锁​,不同的锁通过key进行唯一标识,锁重入通过key和field字段共同标识,从而同时实现一个线程可以通过key不同上多把锁,也可以通过key和field都相同来进行锁重入,key同field不同来对要获取同一把key的锁的其他线程进行阻塞重试,用lua一次提交不可被打断和Redis单线程特性来保证对上锁和解锁操作的原子性

      • 🔎:因为工厂类DistributedLockClient加了@Component注解,所以是单例的,其中的uuid属性只由该工厂类构造方法创建,因此也是单例的,所以uuid属性只是作为每个服务的判断标识,不再作为判断是否同一个线程锁重入的标识,使用更好的Thread.id作为锁重入的标识,让uuid:线程id来区分具体服务的线程,没有uuid两个服务的线程id可能相等,此时再按lua的上锁解锁逻辑就会导致另一个服务的同名线程上锁可以通过锁重入逻辑进行,发生锁不住的现象,所以此处作为服务标识的uuid也是必要的

        【分布式锁实现】

        【分布式锁工厂类实现】

        【业务类对锁的使用】

    • 测试

      • 测试环境:

        • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉基于Redis的hset和lua脚本实现的可重入失败分布式锁对库存数量5000进行单次扣减1,累计5000次扣减请求

        • 2️⃣:在步骤1的基础上写一个获取相同分布式锁的方法1,在业务方法上锁的代码中调用方法1,让锁进行重入测试分布式锁的锁重入效果

        • 3️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉基于Redis的hset和lua脚本实现的可重入分布式锁对库存数量5000进行单次扣减1,累计5000次扣减请求

      • 测试结果:

        • 1️⃣:吞吐量591,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

        • 2️⃣:锁重入失败了,因为每次上锁前获取锁都是获取的不同锁,每次都要创建全新的uuid,就不可能对同一个锁第二次上锁,因此锁重入失败

        • 3️⃣:吞吐量626,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

     

自动续期



  1. 演示JUC的任务调度线程池的局限性

    • JUC定时任务的销毁没有单独一个定时任务的销毁方法,唯一的关闭定时任务的API是shutdown()或者shutdownNow()会直接把整个定时任务线程池销毁,没有停掉一个定时任务的方法,想要某个任务停掉就很麻烦,所以JUC中的定时调度任务在这里也很不好用

    • 🔎:老师说这里适合使用java.util.Timer这个工具类来实现定时调度任务,Timer中创建定时任务和取消定时任务的方法都有,使用起来更方便;但是JUC的老师说Timer的优点是简单易用,但是致命缺点是所有的任务都是被同一个线程执行的,同一时间只能有一个任务执行,剩下任务都得等,执行过程中如果有一个任务发生延迟或者异常都会影响后续其他任务的执行,异常甚至会导致后续任务作废;

    【JUC任务调度线程池使用示例】

  2. java.util.Timer的使用

    • void--->timer.schedule(TimerTask task, long delay, long period)

      • 功能解析:定义定时任务,task是任务对象,TimeTask是抽象类,需要实现抽象run方法;delay是初始延迟时间,单位是毫秒,period是两次执行的间隔时间,单位是毫秒

      • 使用示例

        • 示例含义:定时线程启动后初始延时5秒钟开始每隔十秒打印定时任务执行时间

      • 补充说明

        • 🔎TimerTask实现了Runnable接口

  3. 基于Redis+java.util.Timer+lua脚本来实现自动续期的可重入分布式锁

    • 分布式锁的自动续期lua脚本逻辑

      • 脚本逻辑:如果当前线程上的锁存在,就调用expire指令对锁进行续期,续期成功返回1,如果当前线程的锁不存在,就直接返回0

    • 结合Java代码和lua脚本实现锁的自动续期

      • 这个自动续期的定时调度任务的实现是只让定时调度任务执行一次,续期成功以后原定时调度任务线程直接结束销毁,由原定时调度任务再起一个新的定时调度任务执行下一个续期任务,这样的好处是续期任务定时线程会自己结束,当锁被释放以后定时调度任务就会自动停止设置下一个续期任务,能够灵活地避免考虑自动定时任务自动停止和意外情况下的停止问题,避免意外情况下发生无限续期导致死锁问题的发生

  4. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上述基于Redis+java.util.Timer+lua脚本来实现自动续期的可重入分布式锁对库存数量5000进行单次扣减1,累计5000次扣减请求

    • 测试结果:

      • 1️⃣:没预热情况下吞吐量553,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

 

小结

  1. 比较完善的基于Redis的分布式锁实现如下

    • 分布式锁DistributedRedisLock

    • 工厂方法获取分布式锁对象

    • 业务方法使用分布式锁示例

      • 业务逻辑是并发请求对100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,操作虚拟机上的同一个Redis数据库的同一个共享数据,对库存数量5000进行单次扣减1,累计5000次扣减请求,使用基于Redis的分布式锁解决Redis中共享库存数据的并发线程安全问题

  2. 该分布式锁实现了以下特性

    • 通过setnx指令做到分布式锁的独占排他,后面用lua脚本一次提交执行和Redis单线程特性保证原子性并结合lua脚本的逻辑判断替换了setnx指令的独占排他

    • 通过设置过期时间来防服务宕机或者意外锁无法释放导致的死锁现象,使用完整的set指令来保证独占排他并同时设置有效时间保证上锁和设置有效时间两步操作的原子性,最后被lua脚本整合到同时实现上锁和锁重入的hincrby指令和Expire指令中,用Lua脚本保证上锁或锁重入与设置有效时间两步操作的原子性

    • 通过Redis中的Hash数据类型,以key作为锁的唯一标识,以key加field字段【当前线程创建的UUID】作为线程自身上锁的唯一标识来防止当前线程误删其他线程上的锁,避免因为锁提前失效或者一系列其他原因导致的非上锁线程执行锁释放操作;后为了在定义方法时就确定同一个线程锁重入的自动识别,uuid很难实现在方法定义时就保证方法锁重入时两把锁的uuid相同,因此将当前线程标识即field字段重新设计为线程id,为了避免集群环境下不同服务实例的线程id相同导致上锁通过锁重入获取锁导致锁失效问题,使用uuid作为服务的唯一标识,field字段使用uuid:线程id结合key作为区分获取锁的当前线程的唯一标识

    • 使用lua脚本保证加锁和设置锁过期时间、判断锁是当前线程上的锁和释放锁、判断锁是当前线程上的锁和为锁续期多步操作的原子性

    • 分布式锁的不可重入也可能会导致死锁,用Hash数据类型,key作为锁唯一标识,key和field【uuid:线程id】作为当前线程的唯一标识来做锁重入、锁释放和锁续期中锁属于当前线程的判断标识、以value作为锁重入次数的计数,该设计模仿可重入锁的锁重入实现方式,用lua脚本保证检查锁和操作锁多步操作的原子性

    • 使用JDK的Timer定时器和lua脚本实现可重入锁的自动续期

    • 在Redis主从集群下这种分布式锁可能会因为主机宕机,从机还没来得及同步上锁数据,从升级为主,另一个请求从新主中获取到锁导致锁机制失效,此时就需要使用红锁算法

  3. 锁内部操作实现的逻辑

    • 加锁操作经过四个版本的迭代,其中上锁重试可以使用递归也可以使用循环重试

      • 版本1:基于Redis一行指令setnx上锁实现独占排他

        • 问题:客户端宕机和不可重入引发死锁、不可重入、扩展功能原子性得不到保障

      • 版本2:基于Redis的完整set <key> <value> [Ex seconds] [Px milliseconds] [NX|XX]指令实现排他独占,上锁和设置锁有效时间两步操作的原子性

        • 问题:不具备可重入性仍然存在死锁问题,引入锁有效期出现锁提前过期,错误上锁导致锁失效,锁被错误释放

      • 版本3:基于Redis的Hash数据类型+lua脚本实现带有效时间解决锁错误释放的可重入锁,lua脚本的上锁逻辑是

        • 1️⃣:根据key即锁的名称使用Redis指令exists判断锁是否占用,如果对应key的记录不存在说明锁没有被占用,可以直接获取锁hset/hincrby并设置锁过期时间expire防客户端宕机导致死锁,用lua脚本保证以上操作的原子性

        • 2️⃣:如果锁被占有则通过key和field【uuid:线程id】判断当前线程是否占有锁的线程hexists,是就进行锁重入hincby并设置锁的过期时间expire

        • 3️⃣:​key对应名称的锁被占用且不是当前线程占用,获取锁失败,使用客户端代码进行重试

        • 问题:没有解决锁提前失效的问题,多个线程可能同时运行导致锁失效

      • 版本4:在版本3的基础上基于JDK的Timer定时器和lua脚本实现锁的自动续期,用延时单次定时任务和上一个定时调度线程成功执行才设置下一个定时续期调度线程的方式避开解决意外状况下的定时任务无法停止的问题,用lua脚本保证判断锁被当前线程占用并为锁续期两步操作的原子性

    • 解锁操作经过三个版本的迭代

      • 版本1:使用Redis单步指令del删除键值对来释放锁

        • 问题del指令不能删除键值对的同时检查键值对的value,可能导致误删

          • 🔎:误删是删除非当前线程占有的锁

      • 版本2:使用lua脚本保证多步Redis指令的原子性的前提下先判断value再删除键值对释放锁

        • 问题:锁重入的情况下uuid很难实现对同一把锁进行锁重入和重入后的释放

      • 版本3:基于Redis的Hash数据类型+lua脚本在实现锁重入的基础上对value重入计数进行累加累减以及用uuid+线程id作为线程唯一标识实现对可重入锁的解锁,lua脚本的解锁逻辑为

        • 1️⃣:判断锁是否当前线程占有的锁,不是直接返回nil,将来直接在程序中抛出异常

        • 2️⃣:如果是当前线程占有的锁,对重入计数直接减1hincrby -1,判断减完后的value是否为0,为0直接删除键值对释放锁,lua脚本返回删除键值对的执行结果,1表示释放成功

        • 3️⃣:如果value值不为0则直接返回0

  4. 基于Redis的分布式锁现存问题

    • 在现有实现的基础上仍然存在单点故障的问题,即目前的分布式锁只存在于单台Redis服务器上,如果该服务器挂掉,所有的分布式锁都会失效

      • 🔎:企业开发一般都使用Redis集群,搭建Redis主从集群,配合哨兵机制在主挂掉的时候将从升级为主

    • Redis集群下的上述分布式锁面临的困难

      • Redis主从集群,对分布式锁的更新操作如上锁都在主中完成,然后主将更新操作通过IO操作记录到RDB日志或者AOF日志中,从从主中拉取日志需要发生网络IO操作,IO操作比较耗时,可能发生从还没有来得及同步更新操作日志主就宕机了,此时配合sentinel哨兵机制从升级为新的主,此时获取锁的线程正在执行业务方法,另一个请求如果也来获取锁,此时新升级的主中并没有同步成功对应的上锁操作,新的请求仍然能成功上锁并同时执行业务方法,此时就可能会发生并发线程安全问题

     

     

红锁算法



  1. 红锁算法的原理

    • 🔎:这个红锁算法实现起来很麻烦,实际开发中很少使用,性能由于多redis节点串行获取锁也很难保证,对redis的集群设置也很偏门,一般互联网公司能搭建一个redis主从集群防单点故障就很不错了,这个老师只讲了实现步骤,没讲原理,没有对其具体实现

    • 红锁算法下的Redis集群

      • 红锁算法下的多个Redis节点没有主从关系也没有哨兵机制,相互之间完全独立,需要部署到不同的服务器或者虚拟机中,相互之间也不知道对方的存在

    • 应用程序10010加锁过程

      • 1️⃣:在获取锁以前应用程序需要获取系统当前时间作为获取锁的起始时间来计算从Redis集群中获取锁的总消耗时间

      • 2️⃣:应用程序依次从所有节点上先后获取锁,使用set、setnx或者hset指令获取锁都可以,看业务要求,但是要保证所有节点都以同样的方式获取锁且需要保证键值对的key和value必须一样;依次获取锁的时候每个Redis节点都要设置超时时间,设置超时时间的目的是为了避免某个节点宕机,客户端一直尝试从该节点获取锁一直获取不到,如果超过指定时间获取不到就要去尝试从下一个节点获取锁,直到所有节点都尝试完了

      • 3️⃣:计算所有节点获取锁消耗的时间,包括获取锁失败的节点的耗时时间,在锁没有自动续期的情况下,当获取锁的耗时小于锁的过期时间才任务锁获取成功【锁没有获取到不会进行续期动作】,且获取锁的耗时时间要远小于锁的过期时间,否则即使获取到锁也认为是获取锁失败了;在该前提下还要满足半数以上的节点【N / 2 + 1个】获取到锁才认为锁获取成功

      • 4️⃣:获取到锁以后应用程序还要通过锁的有效时间减去获取锁总消耗时间得到锁的剩余有效时间,这也是实际锁的有效时间,剩余有效时间到了以后Redis集群自动释放锁

      • 5️⃣:如果获取锁失败了【获取锁的消耗时间大于锁的有效时间或者少于半数的节点成功获取锁】,要对所有的节点释放锁,即使有些节点获取锁失败了该节点也需要释放锁,因为应用程序不知道哪些节点获取锁成功了,应用程序只知道有多少个节点获取锁成功了,所有节点都调用del指令释放锁

    • 应用程序10010解锁过程

      • 1️⃣:解锁直接对每个节点释放锁使用del指令删除键值对释放锁即可,获取锁成功的直接删除锁,获取锁失败的删除锁失败也无所谓,根据业务场景需要决定是否使用lua脚本保证原子性或者防误删

       

 

Redisson



配置Redisson

  1. 引入依赖

    • pom.xml

      • 🔎我严重怀疑有场景启动器,这里的配置是使用原生redisson的配置【经过后期确认,Maven仓库确实有对应的场景启动器依赖org.redisson.redisson-spring-boot-starter,配置示例后面遇到再补充】

  1. 配置

    • Properties中没有提供对应的Redisson相关的配置项,不能在SpringBoot的默认配置文件中对Redisson进行配置,具体的配置方法可以参考文档的Configuration【配置方法】章节,可以通过代码、文件的方式对Redisson进行配置

    • Redisson可以通过用户提供的YAML格式的文本文件来进行配置,该YAML文件需要通过调用静态方法Config.fromYAML(new File("config-file.yaml"))来创建Config配置类对象,通过配置类对象调用静态方法Redisson.create(Config config)来实例化RedissonClient对象,这个RedissonClient对象就类似于操作Redis的StringRedisTemplateRedisson通过该对象实现对Redis的所有操作,配置示例如下

    • YAML的文件配置方式比较麻烦,程序化配置相对简单方便,程序化配置的方法是构造一个Config对象,调用对象的实例方法为该对象设置指定的参数,配置示例如下,推荐使用程序化配置方式

      • Redis地址必须以redis://开头,后面跟redis服务器的ip和端口号

  2. 配置Redis集群的模式

    • 针对Redis以不同模式构建,RedissonClient的配置方式不同,但是也是大同小异,Redis常见的构建模式包括集群模式、云托管模式、单Redis节点模式、哨兵模式、主从模式、分片模式;配置对应模式的代码如下

      【单机模式】

      【分片模式】

      • 参数addresses是一个可变长度字符串类型参数,在分片模式下要传递多个Redis节点的地址

      【自定义模式】

      【主从模式】

      【副本模式】

      【哨兵模式】

  3. 单Redis节点模式下的程序化配置方式

    • 如果redis在本机,Redisson可以直接使用create方法以默认连接地址127.0.0.1:6379初始化RedissonClient,示例如下

    • 如果redis不在本机,配置redis的模式、配置redis服务器地址和端口并用Redisson.create(config)方法初始化RedissonClient,示例如下

    • 常用配置示例

    • 最小配置示例

      • 这只是最简单的配置,定制化的配置还有很多,需要翻阅文档,在该配置下可以使用Redisson提供的:

        • 🔎:同时也意味着其他的配置都是可选配置,但是老师说生产环境上面的常用配置项都需要进行配置

        • 基于Redis的分布式锁RLock

      • Redisson是接口RedissonClient的实现类,Redisson的构造方法使用protected修饰,不能在包以外的地方调用,用户无法使用该构造方法;用户需要使用create方法来创建RedissonClient对象

        • 无参的create()方法默认写死了本机的6379作为redis服务器地址,也是调用有参的create方法创建RedissonClient对象

        • 有参的create(Config config)方法通过调用Redisson的构造方法传参config来创建RedissonClient对象

RedissonClient相关API



 

  1. RLock--->redissonClient.getLock(String name)

    • 功能解析:获取一个名为name的分布式锁对象

    • 使用示例RLock lock = redissonClient.getLock("lock")

      • 示例含义:获取一个名为lock的基于Redis的分布式锁对象

    • 补充说明

  2. RBloomFilter<V>--->redissonClient.getBloomFilter(String name)

    • 功能解析:获取布隆过滤器

    • 使用示例:``

      • 示例含义

    • 补充说明

  3. RBloomFilter<V>--->redissonClient.getBloomFilter(String name,Codec codec)

    • 功能解析:获取布隆过滤器

    • 使用示例:``

      • 示例含义

    • 补充说明

  4. RAtomicDouble--->redissonClient.getAtomicDouble(String name)

    • 功能解析:获取原子操作对象RAtomicDouble

    • 使用示例:``

      • 示例含义

    • 补充说明

  5. RAtomicLong--->redissonClient.getAtomicLong(String name)

    • 功能解析:获取原子操作对象RAtomicLong

    • 使用示例:``

      • 示例含义

    • 补充说明

 

RLock



 

  1. RLock原理

    • RLock的锁以用户获取锁传入的字符串作为key,以uuid作为filed,以重入次数作为value,同样使用Hash数据类型标识锁对象;RLock也继承了JUC的Lock接口,RLock本身是一个接口

    • RLock的lock方法被三个子类RedissonLockRedissonMultiLockRedissonSpinLock实现了,这里只关注RedissonLock对lock方法的实现逻辑,RedissonLock继承自RedissonBaseLockRedissonBaseLock实现了RLock接口,即Redisson默认通过redissonClient.getLock("lock")获取到的锁是RedissonLock,其中的lock方法是对JUC的Lock接口的lock方法的实现,该锁本身就叫可重入锁,可重入原理也是和上面一样的

    • RedissonLock的lock方法

      • 该异步上锁方法tryLockInnerAsync底层就是执行了一个Lua脚本,这个脚本和上面我们自己实现的基于Redis的分布式锁几乎是一模一样的

        • 该lua脚本上锁的逻辑是如果锁没有被占用就获取锁并设置有效时间并返回nil;如果锁存在就判断是否是当前线程的锁,是就对重入次数加1重新设置有效时间并返回nil;如果获取不到锁就调用Redis的pttl指令去获取键值对的毫秒级别过期时间

          • 🔎:ttl指令获取的是秒级别的过期时间

      • 上锁成功以后的自动续期会调用scheduleExpirationRenewal(threadId);方法,最底层是在newTimeout方法中使用netty的时间轮实现io.netty.util.HashedWheelTimer去做定时任务的,实际上Redisson的早期版本使用的就是JUC里面的Timer实现的定时重置过期时间,后面更新的版本中才将Timer换成了netty里面的时间轮;

        • 续期脚本在renewExpirationAsync(long threadId)方法中,也和我们此前的续期逻辑是一样的,先判断锁是否当前线程的锁,如果是就去使用Redis的pexpire指令执行毫秒级别精度的设置键值对的有效期,重置成功返回1,失败返回0;

        • 而且也是使用的如果重置有效时间成功还会再去开启一次重置任务,当前任务定时任务会直接销毁的方式

      • 这里面大量使用JUC提供的异步工具类CompletableFuture进行异步编排,性能比我们实现的分布式锁要好很多

      • 注意Redisson实现的RedissonLock是阻塞锁,获取不到锁就直接阻塞等待了,直到当前线程释放锁才有机会再去抢占锁,不是采用循环重试的方式来抢占锁

    • RedissonLock的unlock方法

      • redissonLock.unlockInnerAsync(threadId)方法中定义了解锁的lua脚本,解锁逻辑是先判断锁是否当前线程的锁,不是当前线程的锁直接返回nil;如果是当前线程的锁,定义一个局部变量counter,该局部变量用于接收value减1后的值,如果减1后的计数counter大于0说明锁没解完,重新设置锁的有效时间并返回0;如果减1后的计数counter小于等于0直接删除键值对解锁,使用redis的指令publish发布订阅以后直接返回1表示解锁成功;如果是其他情况仍然返回nil,和我们自己实现的原理是一样的,并在该方法中执行解锁操作

      • 释放锁成功以后调用取消定时续期任务的方法cancelExpirationRenewal(threadId)

       

  2. 用法示例1

    • 最常用的就是RLock的无参加锁lock()方法和unlock()方法

    • 这就是使用基于Redis的分布式锁的扣减库存完整代码,相当于将分布式锁封装好了,只需要使用lock方法上锁,unlock方法解锁,业务方法正常调用StringRedisTemplate对redis中的共享数据进行操作即可就能实现并发线程安全

      • 这个共享数据不一定是必须存redis中,分布式锁是限定所有请求都去竞争系统中都可以获取的唯一的锁,系统中所有要获取同一把锁的请求都串行执行业务方法

      • 🔎:注意,老师还是在业务方法中使用StringRedisTemplate来操作redis中的共享数据的,没有使用redissonClient

      • 🔎:注意,前置工作还包括上面的引入Redisson依赖、配置RedissonClient、实例化RedissonClient对象并将该对象注入容器

    • 测试

      • 测试环境

        • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用Redisson的分布式锁并使用上述代码对redis中的共享库存数量5000进行单次扣减1,累计5000次扣减请求

      • 测试结果

        • 1️⃣:没预热情况下吞吐量873,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

      • 结果分析

        • 1️⃣:性能比我们自己封装的基于Redis的分布式锁吞吐量600要好很多,这种专业框架底层会使用很多技术对性能做优化

  3. 用法示例2

    • RLock的重载lock方法rLock.lock(long leaseTime,TimeUnit unit)的作用是当前线程获取分布式锁,给锁设定有效时间并指定时间单位,当超过有效时间时锁会自动释放,锁生效期间可以手动释放锁,一定注意这个方法是获取锁leaseTime时间间隔以后自动释放锁,锁被释放以后再手动释放锁会抛异常

      • 这里会存在一个问题,使用这个方法上锁,如果指定时间为10s自动释放,而锁自动续期时间间隔也是10s,如果使用该方法锁是不会自动续期的,如果此时业务方法还没有执行完,其他的请求线程就能抢占锁了,不仅会发生线程安全问题

      • 调用rLock.lock(long leaseTime,TimeUnit unit)方法根本不会执行到给锁续期的代码,在下面的1️⃣-1️⃣-1️⃣-1️⃣执行后就直接返回了,如果获取锁成功直接用用户指定时间作为锁的有效时间,到了有效时间就直接自动释放了,这可能可以解释为什么后面基于Zookeeper实现的分布式读写锁,用户的请求线程都已经响应了,但是读锁或者写锁仍然会等到指定时间到了以后才会被释放;

      • 如果是无参数上锁方法rLock.lock()没有指定锁的自动释放时间,会以org.redisson.config.Config中看门狗超时时间属性private long lockWatchdogTimeout = 30 * 1000;默认的30s作为锁的过期时间,并设置一个定时重置过期时间的任务,在下面1️⃣-1️⃣-1️⃣-2️⃣-1️⃣的newTimeout(new TimerTask() {}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);指定这个定时任务的触发时间是获取锁后1/3锁的有效时间即看门狗超时时间的三分之一即10s钟将锁的默认有效时间进行重置

    • 加锁方法rLock.lock(long leaseTime,TimeUnit unit)源码

  4. 用法示例3

    • rLock.tryLock(100,10,TimeUnit.SECONDS)方法的作用是尝试上锁,最多等待100s的时间,100s还是获取不到就放弃抢占锁;如果上锁成功锁的有效期被设置为10s,到了10s以后会自动释放锁

FairLock公平锁



 

  1. 代码示例

    • 在请求参数中按顺序发起几个请求,发送时间间隔小于10s,每次请求都传参一个从1开始的整数id,每次请求递增1

      • 最后的运行效果是,控制台每隔10s输出一个数字,输出数字的顺序是按照请求顺序依次输出1、2、3、4、5、6;说明确实是先进入阻塞队列的请求优先获取锁,证明了该锁的公平性;使用RedissonLock重试一次就会发现id输出顺序是随机的

        • 🔎:使用RedissonLock测试的时候会发现一个请求同时被两个服务实例都接受到并都在控制台输出了相同的id,一个实例输出一次,这是nginx的分发机制导致的,因为这里一直重试迟迟获取不到锁,nginx等待超过一定时间得不到响应就会将请求进行重发,将nginx的请求重发禁用或者将超时时间设置的很长如proxy_connect_timeout 12000;【设置连接超时时间】、proxy_send_timeout 12000;【设置请求发送超时时间】、proxy_read_timeout 12000;【设置响应超时时间】就不会出现这种情况了

      • 对应的Redis中多了一个key为redisson_lock_queue(1)的锁队列,其中保存着正在排队的线程信息,而且该队列是有顺序保证先进入队列的线程先获取锁

     

 

RedissonMultiLock联锁

  1. 概念:

    • 联锁是需要至少三台Redis服务器,每个Redis服务器都对应的一个redissonClient实例,每个实例去对应的redis服务器中获取一把RLock锁,这些锁的名称最好是一样的,也可以不同;通过各个RedissonClient实例获取到指定名字的锁,通过联锁对象RedissonMultiLock的构造方法传参三个锁对象【从这里看得出来,三个redissonClient实例都在一个客户端应用程序上,否则无法传参三个锁对象到一个方法里】返回一个RedissonMultiLock锁对象,通过该对象的lock方法和unlock方法来实现加锁和解锁

  2. 用法示例

  3. 特点

    • 联锁通过redissonMultiLock上锁成功的标志是联锁构造方法参数列表的每个锁都上锁成功,联锁才算上锁成功,只要有一个锁上锁失败,联锁就上锁失败;这意味着只要有对应的任何一台redis服务器宕机,这个联锁上锁就一定会失败,因此这个锁的使用场景很有限,尽量避免使用

 

RedissonRedLock红锁

  1. 概念

    • RedissonRedLock实现了Redis的红锁算法,该对象也可以将来自不同RedissonClient实例对象获取到的锁对象通过RedissonRedLockd的构造方法关联为一个红锁,对应的含义是从不同的Redis服务器获取到的锁关联为一个红锁对象,通过红锁对象的lock方法加锁、unlock方法解锁;

      • 🔎:比联锁好在大部分节点加锁成功加锁就算成功,Redis给出的标准是大于节点数的一半加1,红锁了解一下就行,实际企业开发使用红锁的寥寥无几

  2. 用法示例

     

 

RReadWriteLock读写锁



 

  1. 概念

    • 基于Redis的Redisson分布式可重入读写锁,该对象实现了JUC中的java.util.concurrent.locks.ReadWriteLock接口,读锁和写锁都继承了RLock接口,JDK中的读写锁是读读并发,读写和写写互斥

      • 读写不能并发设计目的是为了避免写的过程中读到脏数据,比如Mysql中的select for update,执行该语句会直接上悲观锁阻塞其他写操作,但是不会阻塞select操作,在修改还没有提交以前读取到旧数据,在特定场景下比如减库存的场景下就会导致超卖现象,因此有些场景下要控制不能读写并发;JUC的读写锁的实现原理是读锁是Shared类型的锁,写锁释放前检查AQS中的后续节点,如果是Shared类型的节点就释放连续一段Shared类型节点直到再次遇到要上写锁的独占类型节点,写锁等到所有的读锁都释放掉了才会被唤醒尝试获取锁

        • 🔎:普通的锁也只是实现多线程写互斥,读操作如果不加锁,在写的过程中读操作不会被阻塞,即普通的锁也是读写并发的,因为使用普通的锁一般不会对读操作上锁,此时读就可能读到脏数据;但是读写锁读写也是互斥的,普通的锁做不到读写锁的效果

      • 但是有些场景下对一致性要求不那么严格也有对应读写并发的实现,比如JUC中的CopyOnWriteArrayList,支持多线程读单线程写,实现原理是更新操作通过复制一个新的数组来执行更新操作,读取操作还是在旧数组中进行

    • Redisson读写锁在redis中保存在key为rwLock的Hash数据中,Hash中为读锁时可以存在多条记录,但是为写锁时只能存在一条记录,第一行记录会保存一个field为mode的记录,value值为write或者read用来表示锁的类型

  2. 用法示例

    【手动释放锁】

    【自动释放锁】

     

 

Semaphore信号量



 

  1. 用法

    • 代码示例

      • 🔎:Redisson中的RSemaphore和JUC中的Semaphore的用法是一样的,获取许可也是acquire方法,释放许可也是release方法,该对象能限制系统内想要通过同名RSemaphore获取许可的线程数量,系统内超过许可数量的线程再想获取同名RSemaphore的许可会被阻塞

      • 🔎:注意这个Semaphore是通过RedissonClient获取的,即使在不同方法中获取,因为redissonClient是单例的,所以只要Semaphore的名字是相同的,通过redissonClient.getSemaphore("semaphore");获取到的Semaphore就是同一个实例对象,而且因为是基于第三方Redis实现的,即使线程不在一个服务也能通过名字共享一个Semaphore对象的信息

      • 🔎:必须通过如semaphore.trySetPermits(3);设置许可数量,否则默认许可是0会导致一个请求都进不来

      • 🔎:使用RSemaphore会在redis中生成一个和预设名semaphore相同的键值对,value是信号量的当前许可计数,但是特别注意,RSemaphore使用完以后,同名的RSemaphore键值对会一直保持第一次设置的允许同时运行的最大线程计数,因为Redis不知道该键值对何时还会再使用,因此不会删;这也会导致后续如果其他线程获取了同名RSemaphore且想通过semaphore.trySetPermits(5);重新设置允许同时运行的最大线程计数是设置不了的,最大允许计数还会保留第一次设置的3,必须将Redis中的对应键值对删掉重新获取并设置允许同时运行的最大线程计数才行

  2. 常用API

 

PermitExpirableSemaphore可过期性信号量


  1. 验证代码及用法示例

    • 代码示例:

       

       

CountdownLatch闭锁



  1. 用法示例

    • 代码示例

      • 只要key的名字相同,所有微服务就能通过redis中的以自定义名称studentCount来获取对应的键值对和计数信息,通过乐观锁机制进行减计数,因此能控制系统内多个服务的线程计数

      • 注意当计数减为0以后redis中的键值对会消失

      【控制器方法】

      • 班长先锁门被阻塞,当出门请求处理6次以后班长锁门放行

       

基于zookeeper实现分布式锁


  1. Zookeeper安装、指令、节点类型相关内容参见Linux指南--安装Zookeeper

 

Java客户端



  1. 使用官方Zookeeper客户端

    • 引入依赖

      • 在zookeeper的客户端中已经引入了slf4j-log4j12,如果已经在其他地方也引入就会有Slf4j日志标红提示,老师的解决办法是从zookeeper的依赖中移除slf4j-log4j12

        • 我认为这里老师讲的是错误的,因为maven会自动处理重复的依赖项,除非两个相同依赖的版本不一致,另一方面从依赖树形结构图中没有找到期望被移除的slf4j-log4j12,从报错信息上来看是logback-classic1.2.11中的org.slf4j.impl.StaticLoggerBinder.classslf4j-reload4j.1.7.36中的org.slf4j.impl.StaticLoggerBinder.class两个类发生了冲突,通过网络搜索发现网上那个和slf4j-log4j12冲突的报错信息确实是slf4j-log4j12-1.7.25.jar,总之就是logback和log4j之间关于org/slf4j/impl/StaticLoggerBinder.class这个类发生的冲突,不同jar包下的相同的全限定类名的类在不破坏JVM的双亲委派模型类加载机制情况下全限定类名相同的类只会加载被先加载的jar包中的对应类,jar包的加载顺序和classpath参数有关,包路径越靠前越先被加载,加载顺序靠后的jar包中的全限定类名相同的类会被直接忽略掉不会再被加载,SpringBoot的默认日志是logbacklog4j是以前的主流日志,很多第三方工具包都使用的是log4j,解决办法是排除logback或者log4j的其中一个,让整个项目使用其中的一种日志;【具体原因还需要深入分析】

          【报错信息】

          【springboot排除logback依赖实例】

          • 排除logback就要把项目中所有的logback都排除,只使用log4j

      【正常依赖】

      【老师的排除slf4j-log4j12示例】

      【实测排除slf4j-reload4j也行】

  2. zookeeper客户端对象的获取

    • Zookeeper对象通过构造方法创建,构造方法只能是有参构造,必须传参连接字符串connectString、连接超时时间sessionTimeOut、监听器watcher,该对象使用完以后必须调用close方法来手动关闭连接,注意关闭连接就相当于客户端已经断开连接了,该客户端创建的临时节点都会被zookeeper服务器删除

      • 连接字符串connectString:格式必须为"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002",参数值是Zookeeper服务器集群的地址

        • 用逗号分隔各地址,注意逗号两边不能有空格

      • 连接超时时间sessionTimeOut:参数为int类型,单位是毫秒

      • 监听器Watcher:一个接口,需要使用匿名内部类的方式重写process方法来实例化对象,process方法会在连接建立时和连接关闭时各执行一次

      • 注意调用完Zookeeper的构造方法以后还在获取连接程序就会执行后续的代码,此时zookeeper对象只是赋值了对象地址因为建立连接较慢还没有完成初始化,其中的功能是无法正常使用,此时需要使用闭锁CountdownLatch来实现对zookeeper初始化进行等待的效果

    • zookeeper对象的初始化示例

    • 使用闭锁CountdownLatch等待zookeeper对象初始化完成

      • 注意,两次process方法回调时传参的WatchedEvent对象的state属性值是不同的,第一次获取连接是SyncConnected,关闭连接时是Closed,可以根据两个属性值来区分是获取连接还是关闭连接,该属性值的类型是枚举Event.keeperState,可以通过该属性值和枚举值对比决定是否需要放行countDownLatch.await()从而继续执行获取到zookeeper连接后的动作

        • :不是异步获取连接吗为什么这里属性值是同步连接

      【优化后的通用模板代码】

    • 上述代码还不够完善,因为节点事件的监听回调依然会执行process()方法,此时process方法传参的event和获取连接时回调process方法的event分别为WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/earl[1]WatchedEvent state:SyncConnected type:None path:null;注意这些event中的state属性为SyncConnected,和关闭连接时的WatchedEvent state:Closed type:None path:nullstate属性Closed不同,由此区分关闭连接和其他事件;可以通过event参数的type属性来区分事件类型从而执行不同的回调逻辑,在state属性为SyncConnected的前提下,当typeNone时表明回调由成功获取连接发起,当type属性为事件类型时表明回调由事件发起;此外注意event中的path存储了事件监听节点的路径,通过该路径可以制定不同节点的事件回调逻辑;由此可以将节点事件回调分成获取连接、关闭连接、节点事件三个大类执行对应的回调逻辑,对节点事件可以通过事件节点路径来区分执行不同的回调逻辑,示例代码如下

      • 在Zookeeper对象的构造方法传参watcher对象中通过event对象的state属性、type属性和path属性来分区获取连接回调、关闭连接回调和不同类型不同节点的节点事件回调

      • 节点事件回调直接在Zookeeper构造方法传参中写一起不优雅,不方便读和改,判断逻辑复杂;业界常用的方式是在节点事件监听方法中传参Watcher匿名实现重写process方法来自定义节点事件的回调逻辑,连续回调需要对节点事件方法进行封装,通过在回调方法中递归调用该方法来实现连续回调

       

       

API

  1. String ---> zookeeper.create(final String path,byte[] data,List<ACL> acl,CreateMode createMode) throws KeeperException, InterruptedException

    • 功能解析:创建指定数据、指定数据下的节点,返回值为被创建节点的路径

    • 使用示例zooKeeper.create("/earl/testJavaClient", "Hello zookeeper".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

      • 示例含义:创建一个路径为/earl/testJavaClient,数据内容为Hello zookeeper,允许所有客户端对该节点进行任何操作的永久节点

    • 补充说明

      • 数据内容data要求传参一个byte数组,可以通过字符串.getBytes()方法获取对应的byte数组

      • 权限List<ACL>有专门的枚举类ZooDefs.Ids,常用的三种权限包括

        • ZooDefs.Ids.OPEN_ACL_UNSAFE:所有客户端都可以对创建的节点做任何操作

        • ZooDefs.Ids.CREATOR_ALL_ACL:创建节点的客户端可以对节点做任何操作

        • ZooDefs.Ids.READ_ACL_UNSAFE:所有客户端都能对创建的节点做读取操作

      • 节点类型createMode也使用专门的枚举类CreateMode,对应节点类型有以下四种

        • CreateMode.PERSISTENT:创建的节点是永久节点

        • CreateMode.EPHEMERAL:创建的节点是临时节点,这种方式创建的临时节点在调用zookeeper.close()方法后节点会直接被zookeeper服务器秒删

        • CreateMode.EPHEMERAL_SEQUENTIAL:创建的节点是序列化临时节点

        • CreateMode.PERSISTENT_SEQUENTIAL:创建的节点是序列化永久节点

  2. Stat ---> zookeeper.exists(String path, boolean watch) throws KeeperException, InterruptedException

    • 功能解析:判断指定路径节点是否存在,如果返回值为null说明对应节点不存在,如果返回值不为null说明对应的节点存在;第一个参数是指定节点路径,第二个参数是指定是否要监听,指定true表示要监听时间,指定false表示不监听

    • 使用示例zooKeeper.exists("/earl/testJavaClient", false);

      • 示例含义:查询路径为/earl/testJavaClient的节点是否存在

    • 补充说明

      • exists方法相当于zookeeper中的stat指令,可以通过该方法的重载方法来做对节点删除和节点创建事件的监听

  3. byte[] ---> zookeeper.getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException

    • 功能解析:查询已经存在的指定路径节点的内容数据,这里传参statzooKeeper.exists("/earl/testJavaClient", false);的返回值,暂时认为要查询指定节点的内容数据必须先查询该节点是否存在。第二个参数是指定是否要监听,指定true表示要监听时间,指定false表示不监听

    • 使用示例zooKeeper.getData("/earl/testJavaClient", false, exists);

      • 示例含义:查询已经存在节点/earl/testJavaClient的数据内容

    • 补充说明

      • getData方法相当于zookeeper中的get指令,可以通过该方法的重载方法来做对节点的数据变化监听

  4. List<String> ---> zookeeper.getChildren(String path, boolean watch) throws KeeperException, InterruptedException

    • 功能解析:查询一个指定节点下的全部子节点

    • 使用示例zooKeeper.getChildren("/earl", false);

      • 示例含义:查询节点/earl下的所有子节点

    • 补充说明

      • getChildren方法相当于zookeeper中的ls指令,可以通过该方法的重载方法来做对子节点的创建、删除、数据内容变化监听

  5. Stat ---> zookeeper.setData(final String path, byte[] data, int version) throws KeeperException, InterruptedException

    • 功能解析:更新一个指定节点的数据内容,第三个参数version需要使用exists方法查询获取Stat返回值,用stat.getVersion()来获取指定路径节点的版本号,如果更新时发现当前数据版本号和传参不一致,更新操作就会失败;即更新操作也需要事先查询指定节点是否存在且获取Stat返回值作为更新方法传参

    • 使用示例zooKeeper.setData("/earl/testJavaClient", "hello earl".getBytes(), exists.getVersion())

      • 示例含义:把节点/earl/testJavaClient下的数据内容更新为hello earl

    • 补充说明

      • 版本号也可以指定为-1,表示更新操作不关心版本号,本次更新操作一定会成功

  6. void ---> zookeeper.delete(final String path, int version) throws InterruptedException, KeeperException

    • 功能解析:删除指定路径节点,如果当前版本号和指定版本号不一致则删除失败

    • 使用示例zooKeeper.delete("earl/testJavaClient",exists.getVersion());

      • 示例含义:删除节点/earl/testJavaClient

    • 补充说明

      • 版本号也可以指定为-1,表示删除操作不关心版本号,本次删除操作一定会成功

      • 如果要删除的节点不存在仍然执行了该方法会抛出异常,注意凡是涉及到事件监听的方法调用,调用事件监听方法的线程在事件发生前不能提前结束执行,事件发生时线程运行结束会导致无法执行事件监听成功后的回调,因此调用事件监听方法的线程不能在事件发生前结束,不能在等待期间发生异常,发生了异常如果没有捕获处理也会直接结束当前线程

     

 

基础实现



  1. 基础实现

    • 原理:

      • 通过组件标注了@PostConstruct注解的init()方法可以在项目启动时就去创建Zookeeper客户端对象,Zookeeper的连接属于长连接,项目正常运行期间不需要关闭连接;通过组件中标注了@PreDestory注解的destory()方法在Spring容器销毁前执行对应的释放连接的代码,这样就实现了不用每次获取锁都去重新建立连接,虽然第一次启动会变慢,但是用户感觉不到,且能提升分布式锁的性能

        • 🔎:项目启动时初始化Spring容器,此时会去扫描标注了注解@Component的类,通过对应类的无参构造方法来初始化对应的对象,在init方法上标注了@PostConstruct注解,该init方法会在类的无参构造方法执行以后立即执行,通过这种方式可以保证项目启动时就会立即执行init()中的方法;通过标注了@PreDestory注解的destory()方法可以实现在容器组件销毁前执行其中如释放连接的代码

      • 每次去获取锁对象都去检查Zookeeper中锁的根节点/locks是否存在,避免创建锁节点/locks/lockName时因为父节点不存在而连续失败

      • Zookeeper的临时节点天然就能解决因服务器宕机导致的死锁问题,因为服务器宕机会导致客户端和Zookeeper的连接断开,此时Zookeeper服务器就会自动删除对应客户端的临时节点,从而达到服务器宕机自动释放锁的效果,因此基于Zookeeper实现的分布式锁不需要考虑给锁设置有效时间防服务器宕机死锁的问题,也不需要考虑使用定时任务来给锁不断续期的问题

      • 释放锁不需要考虑版本号直接删除对应节点即可

    • 代码示例

      【分布式锁工厂类】

      【基于Zookeeper实现的分布式锁】

      【业务代码】

    • 测试

      • 测试环境:

        • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉基于Zookeeper临时节点实现的不可重入分布式锁对库存数量5000进行单次扣减1,累计5000次扣减请求

      • 测试结果:

        • 1️⃣:吞吐量300,系统预热后压测性能也只有380,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

 

阻塞公平锁优化



  1. 原理:

    • 通过序列化临时节点可以让每个创建锁节点的线程都去创建同一个名字的锁节点,这些锁节点的序列号不一样,我们可以将序列号最小的节点视为获取到锁,这样每个请求线程一次执行就能获取到锁对象

      • zookeeper.create()方法创建节点成功会返回节点的完整路径,通过该路径我们可以分离出序列号,为了方便分割锁的名称和序列号,在锁名称后面加一个标识符-,而且还可以整体作为前缀准确区分一各分布式锁【比如使用StringUtils.startWith(),前缀加一个标识符能标记锁名称的开始和结束,这样不会方便同一个分布式锁的区分】,想办法通过序列号获取前置节点,如果前置节点为null,说明是第一个节点就直接去执行业务方法;如果前置节点不为null,说明当前线程应该被阻塞监听前置节点删除对应节点并释放锁后执行业务代码并删除锁释放前置节点

    • 其他的节点对应的请求线程去监听序列号从小到大排序比自己小的在自己前面节点的删除事件,通过这种监听前一个节点的事件和回调来实现请求线程未获取锁阻塞的效果;

      • :这里存在一个问题,一个服务宕机了会自己删节点,后继节点会被唤醒,但是此时后继节点不知道还有没有前驱节点,可能发生宕机导致锁不住的情况出现线程安全问题

    • 这种方式因为Zookeeper的特性不仅实现了阻塞锁还实现了公平锁,先创建节点的线程序列号更小会先获取锁

    • 获取前驱节点的逻辑:

      • 1️⃣:获取根节点下的所有子节点,一般能获取成功,获取失败都属于是不正常的情况,可以尝试抛一个获取锁常用的IllegalMonitorStateException,如果子节点集合为空集合也说明有问题也抛该异常,因为获取前驱节点就是为了给刚创建的节点添加监听事件,所以子节点集合不应该为空;

      • 2️⃣:注意获取到的子节点包含所有商品库存的锁,不同的商品应该可以并发,注意序列号不管是不是同一个商品的锁整体每个序列号都是唯一的,所以前驱节点不能简单地只根据序列号进行判断,要根据场景进行设计,在所有分布式锁都在一个父节点下,就要根据锁的名称来对分布式锁分类,只对同一类的分布式锁序列号进行排序找到前驱节点,获取到同一类锁的集合并对该集合判空,因为所有分布式锁的集合不为空但是一个分布式锁下的节点集合可能为空,其实也不可能,因为刚刚创建了一个该分布式锁下的节点还没删,为空肯定是出问题了,这种情况也直接抛异常

        • 这个设计属实有点简陋,考虑使用单调栈或者数据结构优化一下才行,不然每个请求都去获取一遍所有子节点又对锁分类排序,性能不可能好

      • 3️⃣:如果集合不为空判断当前节点在集合中的下标位置,如果下标位置小于0直接抛异常,下标位置大于0则返回前驱节点,下标位置为0则说明没有前驱节点,直接返回null

        • 注意这里获取前置节点的代码是没有获取锁时的操作,此时还在获取前驱节点,前驱节点可能已经将锁释放掉删除节点了,此时再监听前驱节点的删除事件已经没有意义了,如果将该节点设置为前驱节点会导致当前节点永远不会被唤醒,因此,在设置监听前驱节点事件前还需要检查前驱节点是否还存在

          • :但是感觉还是有问题,因为没有获取到锁,再检查到执行监听事件期间前驱节点还是可能释放锁,导致删除事件永远不会触发

          • 🔑:因为ls指令即Exist方法在判断节点是否存在的同时会给对应节点添加监听事件,暂时认为判断节点存在和添加监听事件这一条ls指令是原子性的,认为Zookeeper内部对节点的操作是上了锁的【Redis是通过单线程来保证指一条指令的原子性,但是Zookeeper不清楚是否保证了单条指令执行的原子性或者对节点的更新和监听操作是否上锁】,否则这里肯定有问题,老师并没有做解释

      • 4️⃣:如果前驱节点存在,检查前驱节点是否仍存在Zookeeper服务器中,如果仍然存在则监听前驱节点的删除节点事件,如果Zookeeper服务器中前驱节点已经不存在了或者List集合中没有前驱节点tryLock方法直接返回true表示上锁成功,直接执行业务方法即可

      • 5️⃣:如果成功设置对前驱节点的删除事件监听,当回调方法调用时直接结束方法执行返回true表示当前线程成功获取锁,使用闭锁CountdownLatch来阻塞当前线程,在回调方法中让锁计数减1来唤醒当前线程,这意味着回调线程和调用监听节点事件的线程不是同一个线程

  2. 代码示例

    【分布式锁工厂类】

    【基于Zookeeper实现的分布式锁】

    【业务代码】

  3. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉基于Zookeeper临时序列化节点和事件监听实现的阻塞式不可重入分布式锁对库存数量5000进行单次扣减1,累计5000次扣减请求

    • 测试结果:

      • 1️⃣:吞吐量559,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

 

锁重入实现



 

  1. 原理

    • ThreadLocal<Integer>中获取锁重入计数,如果获取的计数为null或者小于等于0,说明当前线程还没有上锁,执行竞争锁逻辑;如果获取的计数大于0说明当前线程已经获取锁,在该计数上加1表示锁重入次数加1;首次获取锁时将锁重入计数置为1;

    • 解锁时先将锁重入计数-1,因为只有获取锁的线程才可能执行解锁方法,此时锁重入计数必然大于等于1;判断减1后的锁重入计数是否为0,如果为0彻底释放锁,如果不为0说明锁还没释放干净,直接返回不能删除锁

  2. 代码示例

    【分布式锁工厂类】

    【基于Zookeeper实现的分布式锁】

    【业务代码】

  3. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上诉基于Zookeeper临时序列化节点和事件监听实现的阻塞式可重入分布式锁对库存数量5000进行单次扣减1,累计5000次扣减请求

    • 测试结果:

      • 1️⃣:吞吐量612,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

 

小结

  1. Zookeeper分布式锁的特点

    • ZNode临时节点不可重复,通过都去自旋重试创建同一个ZNode节点实现独占排他

    • 通过序列化临时节点和监听前驱节点的删除事件可以实现竞争锁的线程阻塞等待,新创建节点根据序列号检查是否有前驱节点,如果没有直接获取锁,如果有且设置监听事件时仍存在则当前线程使用闭锁阻塞,在前驱节点的删除事件回调中让闭锁计数减至0来放行当前线程;如果前驱节点在设置监听事件时已经不存在了也说明当前线程可以直接获取锁

    • 通过序列化临时节点的阻塞队列还实现了公平锁,先创建节点的线程先获取到锁

    • 由于Zookeeper客户端断开连接对应客户端的临时节点全部删除的特性,也天然促成分布式锁不会因为服务器宕机导致的死锁问题,从而无需考虑锁有效期、续期、和正确释放当前线程的锁的问题

    • 通过ThreadLocal对当前线程的锁计数进行记录,加锁是如果锁重入计数大于0,说明当前线程已经获取到锁,当前加锁为锁重入,直接累加锁重入计数即可,如果锁重入计数为null或者0,说明当前线程还没有获取到锁,当前线程进入竞争锁的逻辑;释放锁时对所重入计数减1,比较减1后的锁重入计数是否为0,为0说明锁释放干净了,可以删除对应节点释放锁;如果减1后的锁重入计数大于0,说明锁还没有释放干净,直接返回等待后续锁释放干净才删除对应节点;锁重入实现也能防止因为锁不可重入当锁重入发生时带来的死锁问题;此外可重入还可以借助Zookeeper节点的数据内容来作为锁重入计数来实现

  2. 和基于Redis实现的可重入分布式锁的对比

    • 对于防客户端宕机导致的死锁现象,基于Redis实现通过给锁设置有效时间来解决,并需要解决由此引发的锁提前过期和其他线程错误获取锁以及错误释放锁的问题;基于Zookeeper实现通过临时节点当客户端宕机与Zookeeper服务器断开连接,Zookeeper服务器心跳检测不到客户端程序就会直接删除对应客户端创建的临时节点来删除锁

    • 对于可重入实现,基于Redis实现通过Redis中的Hash数据模型、以UUID作为服务器标识,用线程id作为线程标识,value作为锁重入计数来实现的;基于Zookeeper实现通过ThreadLocal存储锁重入计数或者对应节点的数据内容来实现,也可以通过ConcurrentHashMap来实现,比如Zookeeper客户端Curator就是基于ConcurrentHashMap来实现的

    • 对于防误删,基于Redis实现通过UUID标识服务实例,用线程id标识请求线程,删除前检查当前的锁是否是当前线程获取到的锁,并使用Lua脚本来保证该过程的多步操作原子性来实现防误删;基于Zookeeper实现通过为每个请求线程创建唯一的临时序列化节点,释放锁也只删除对应当前线程的临时节点,不会影响到其他线程创建的临时节点,通过该方式实现锁的防误删

    • 对于原子性操作,基于Redis实现通过Lua脚本的一次提交执行多条Redis指令来保证原子性;基于Zookeeper实现通过Zookeeper的单条指令如创建节点、删除节点、查询并监听节点都是原子性的【因为没学过Zookeeper,对这里不是很理解是如何保障原子性的】,对于查找前驱节点和监听前驱节点两步操作直接无法保证原子性,通过使用以找到的前驱节点验证该节点存在并设置监听该节点的删除事件验证监听两步操作的原子性来保证前驱节点如果在监听前已经被释放则不再执行监听直接获取锁,避免前驱节点删除事件永远不会被触发导致请求线程一直被阻塞

    • 对于可重入,基于Redis实现通过UUID:线程ID作为Hash模式的Field,锁重入计数作为value,使用Lua脚本保证验证锁是当前线程的锁并对锁重入计数累加或者递减来实现的;基于Zookeeper实现有多种实现方式,这里使用的ThreadLocal保存当前线程的锁重入计数对锁重入计数增删实现的,还可以通过Zookeeper节点数据作为锁重入计数来实现,也可以参考Curator的基于ConcurrentHashMap来实现锁重入

    • 基于Zookeeper实现通过临时节点天然防客户端宕机死锁,不需要考虑锁的有效期问题和自动续期问题;基于Redis实现通过给Hash数据模型设置有效时间通过定时任务Timer或者Netty的时间轮来实现给锁自动续期

    • 对于锁载体单点故障,基于Redis实现使用集群形式来存放分布式锁可能由于IO开销导致的延迟在主机宕机时因为数据未同步导致锁失效,而只使用一台Redis可以认为一定会出现单点故障问题,在集群环境下需要使用红锁来解决锁失效的问题;Zookeeper服务器天然就要搭建为集群方式使用,Zookeeper集群是强一致性集群,一致性保障比Redis集群好很多,也不存在主节点宕机导致数据未同步丢失的情况

    • 基于Zookeeper实现的阻塞锁非常地方便,使用节点事件监听就能完成,但是基于Redis实现阻塞锁就非常地麻烦,公平锁方面Zookeeper通过序列号和阻塞队列相较于Redis也非常容易实现

      • 🔎:弹幕指出:说一下zk的问题,如果单个线程oom了,锁不释放,相当于死锁.。可重入锁都没有兜底threadlocal的clean,易引发内存泄露。

 

Curator



  1. 引入依赖

    • 使用Curator需要分别引入curator-frameworkcurator-recipes,注意这两个依赖中都含有zookeeper官方客户端依赖,使用zookeeper客户端需要与服务端的版本相同,实际上也没这么严格,我这里使用zookeeper3.5.7的服务器使用zookeeper3.7.0的客户端也没有出现任何问题,使用Curator并不需要使用zookeeper客户端,因此为了避免版本冲突问题,最好将zookeeper客户端从curator-frameworkcurator-recipes中排除出去,有需要的时候再单独进行引入

  2. 编写配置类初始化curator-framework的客户端CuratorFrameWork,该客户端对象类似于Redis客户端中的RedisTemplateRedisson中的RedissonClient

    • 配置类

      • CuratorFramework是一个接口,该接口有一个子接口WatcherRemoveCuratorFramework和一个子实现类CuratorFrameworkImpl,子实现类CuratorFrameworkImpl有两个子类NamespaceFacadeWatcherRemovalFacade,一般常用工厂类的newClient方法来初始化CuratorFramework组件,该方法有两个重载方法CuratorFramework. newClient()

        • Zookeeper官方客户端是不具备连接重试功能的,这就是Curator对Zookeeper官方客户端做出的优化之一

        • 其中newClient(String connectString, RetryPolicy retryPolicy)方法不需要指定会话和连接超时时间

          • 第一个参数是指定Zookeeper服务器地址,参数格式为192.168.200.132:2181,注释说这是一个服务器地址列表,虽然注释没指明具体格式,猜测使用逗号分隔多个服务器地址

          • 第二个参数retryPolicy是指定重试策略,RetryPolicy是一个接口,有两个直接子类分别是SleepingRetryRetryForever,前者表示有间歇的重试,后者表示持续重试,持续重试可能导致服务器浪费大量的资源,一般不推荐使用该策略;可以使用SleepingRetry的子类RetryNTime,该策略可以指定每隔多少时间重试一次,最多重试多少次;一般使用SleepingRetry的子类ExponentialBackoffRetry指数补偿重试,除了可以指定重试次数,还可以指定一个初始间隔时间,第一次在初始间隔时间重试,以后每次重试的间隔时间会递增,重试次数越多间隔时间越长,这样的策略设计更符合节省服务器资源的标准

        • 第二个重载方法CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)需要额外指定会话和连接超时时间

      • 初始化CuratorFramework对象以后需要使用start方法手动启动一下,否则Curator底层很多方法或者功能都是不工作的,即使调用了也无法使用,Curator的大多功能都通过该对象进行调用

       

InterProcessMutex可重入锁



  1. 概述

    • InterProcessMutex是一个普通类,可以直接通过构造方法初始化实例对象

      • 该构造方法需要我们上面初始化好的CuratorFramework客户端作为第一个参数,第二个参数需要分布式锁的用户自定义锁的路径

      • 注意InterProcessMutex是根据锁对象来区分是否同一把锁的,也就是不能通过向构造方法传参相同的path来获取同一把锁,如果使用相同的路径来调用构造方法创建不同的InterProcessMutex对象,会直接在Zookeeper中覆盖前一把锁的节点,导致锁无法被释放,程序一直卡住

        • 此时锁重入的锁对象就需要通过调用方法的参数来传递,这种方式极其不方便,因为这意味着定义方法的时候就要考虑锁重入的问题,还要考虑是哪一把锁重入的问题,非常地不灵活

  2. InterProcessMutex的用法示例

    • void ---> acquire() throws ExceptionInterProcessMutex的加锁方法,void release() throws ExceptionInterProcessMutex的解锁方法

  3. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用InterProcessMutex对库存数量5000进行单次扣减1,累计5000次扣减请求

    • 测试结果:

      • 1️⃣:吞吐量458,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

    • 结果分析:

      • 1️⃣:很菜,还没有我们自己实现的性能高

  4. 锁重入的使用方式

    • 错误示范

      • 这种方式来进行锁重入会导致程序运行卡死,因为InterProcessMutex只能通过对象来识别是否同一把锁,通过相同path来创建的两个InterProcessMutex实际上是两把不同的锁,但是因为路径相同,会直接覆盖掉Zookeeper服务器中先上锁创建的节点,导致第一把锁无法释放并导致程序卡死

    • 正确示范

      • 此时锁重入的锁对象就需要通过调用方法的参数来传递,这种方式极其不方便,因为这意味着定义方法的时候就要考虑锁重入传递锁对象的问题,还要考虑方法是不是自己上全新的锁,非常地不灵活;或者通过成员变量传参一个锁对象,总之非常不好

  5. 源码解析

    • 锁的初始化

      • 核心:构造了一个InterProcessMutex对象并将用户自定义根节点路径存入其中的basePath属性,校验用户自定义的父节点路径,构造一个LockInternals对象将标准锁内部驱动赋值给driver属性、通过用户自定义的curatorFramework构造可以监听的curatorFramework赋值给client属性、将用户自定义父节点路径赋值给basePath属性、将写死的lock-赋值给lockName属性、将最大租约数1赋值给maxLeases属性、将basePath+"/"+lockName赋值给path属性,将LockInternals对象赋值给InterProcessMutex对象的internals属性,后续主要通过internals属性的LockInternals对象来对锁进行操作

    • 加锁方法

      • 核心:这里面写的很绕但是核心逻辑还是和我们自己的实现差不多,使用ConcurrentHashMap来存储锁重入计数信息,加锁先从ConcurrentHashMapthreadData通过当前线程获取锁重入计数对象LockData,如果不为null说明本次上锁是锁重入,直接原子整数累加1;如果LockData为null就进入获取锁的阶段,会拼接出完整的节点对象路径并创建临时序列化节点,并获取子节点列表,判断创建的临时序列化节点的序列号在子节点列表中是否小于最大租约数,如果小于就获取锁成功,这里最大租约数默认写死就是1,所以只有最小的节点才能获取锁,返回获取锁的标志作为返回当前节点完整路径的标志并将路径了当前线程以及锁重入计数为1封装到LockData中传递给threadData并结束上锁方法;如果大于最大租约数就去监听当前节点下标-最大租约数的节点的删除事件,里面的事件监听并判断前驱节点是否仍然存在使用的getData方法,比我们使用的exists方法的优势在于exists方法不论节点是否存在都会进行监听,但是getData方法如果节点不存在就不会进行监听,这样可以避免客户端资源的泄露和浪费;面试按前面手动实现的过程结合这个流程来说即可

    • 解锁方法

      • 核心:释放锁的代码和我们自己实现的代码是一样的,释放锁先从ConcurrentHashMap中获取重入数据,直接对锁重入

       

InterProcessSemaphoreMutex不可重入锁



  1. 常用API

    • public InterProcessSemaphoreMutex(CuratorFramework client, String path);

      • 功能解析InterProcessSemaphoreMutex的构造方法,传参用户自定义的CuratorFramework对象和父节点路径path

      • 使用示例InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(curator, path);

        • 示例含义:构造分布式不可重入锁示例以获取锁对象

    • void ---> interProcessSemaphoreMutex.acquire();

      • 功能解析:获取锁interProcessSemaphoreMutex

      • 使用示例mutex.acquire();

        • 示例含义:阻塞直到获取到分布式锁

    • boolean ---> interProcessSemaphoreMutex.acquire(long time, TimeUnit unit);

      • 功能解析:带超时机制的获取锁InterProcessSemaphoreMutex,如果到阻塞指定时间仍没有获取到锁放弃等待锁

      • 使用示例mutex.acquire(10, TimeUnit.SECONDS);

        • 示例含义:十秒内尝试获取锁,超过十秒未获取放弃获取锁

    • void ---> release();

      • 功能解析:释放锁interProcessSemaphoreMutex

      • 使用示例mutex.release()

        • 示例含义:释放已占有的锁

  2. 用法示例

 

InterProcessReadWriteLock可重入读写锁



  1. 常用API

    • public InterProcessReadWriteLock(CuratorFramework client, String basePath);

      • 功能解析InterProcessReadWriteLock的构造方法,传参用户自定义的CuratorFramework对象和父节点路径path

      • 使用示例InterProcessReadWriteLock rwlock = new InterProcessReadWriteLock(curatorFramework, "/curator/rwlock");

        • 示例含义:构造父节点路径为/curator/rwlock分布式可重入读写锁示例以获取锁对象

    • InterProcessMutex ---> interProcessReadWriteLock.readLock();

      • 功能解析:从读写锁interProcessReadWriteLock中获取读锁

      • 使用示例rwlock.readLock().acquire(10, TimeUnit.SECONDS);

        • 示例含义:从读写锁获取读锁并使用过10s自动解锁的方式来释放锁

      • 补充说明

        • 读锁和写锁都创建的是传参不同的InterProcessReadWriteLock下的静态内部类InternalInterProcessMutex对象,该静态内部类继承自InterProcessMutex

        • 这个读写锁的acquire(10, TimeUnit.SECONDS)方法和Redisson中的读写锁RReadWriteLocklock(10, TimeUnit.SECONDS)一样意思不是带等待超时时间的加锁,而是获取锁以后过10s自动释放锁,一定要注意这点

    • InterProcessMutex ---> interProcessReadWriteLock.writeLock();

      • 功能解析:从读写锁interProcessReadWriteLock中获取写锁

      • 使用示例rwlock.writeLock().acquire(10, TimeUnit.SECONDS);

        • 示例含义:从读写锁获取写锁并上锁,获取锁10s钟以后自动释放锁

    • boolean ---> internalInterProcessMutex.acquire(long time, TimeUnit unit);

      • 功能解析:从读写锁interProcessReadWriteLock中获取写锁或者读锁,并从获取锁开始到指定时间后自动释放锁

      • 使用示例rwlock.writeLock().acquire(10, TimeUnit.SECONDS);rwlock.readLock().acquire(10, TimeUnit.SECONDS);

        • 示例含义从读写锁获取写锁并上锁,获取锁10s钟以后自动释放锁从读写锁获取读锁并上锁,获取锁10s钟以后自动释放锁

      • 补充说明

        • 读锁和写锁都是InterProcessReadWriteLock中继承自InterProcessMutex的静态内部类internalInterProcessMutex,他的加锁acquire()方法和解锁release()方法都来自于父类InterProcessMutex,该加锁方法的意思是获取锁等待指定时间自动释放锁,即使当前线程任务执行完了也要阻塞等到10s钟以后锁自动释放了才能将请求返回给用户

        • 这个读写锁的acquire(10, TimeUnit.SECONDS)方法和Redisson中的读写锁RReadWriteLocklock(10, TimeUnit.SECONDS)一样意思不是带等待超时时间的加锁,而是获取锁以后过10s自动释放锁,一定要注意这点

    • void ---> internalInterProcessMutex.acquire();

      • 功能解析:从读写锁interProcessReadWriteLock中获取写锁或读锁,这种上锁方式必须手动释放锁

      • 使用示例rwlock.writeLock().acquire(10, TimeUnit.SECONDS);rwlock.readLock().acquire(10, TimeUnit.SECONDS);

        • 示例含义从读写锁获取写锁并上锁从读写锁获取读锁并上锁

      • 补充说明

        • 这种无参上锁方式必须手动释放锁

    • void ---> internalInterProcessMutex.release();

      • 功能解析:写锁或读锁手动释放锁

      • 使用示例rwlock.writeLock().release();rwlock.readLock().release();

        • 示例含义写锁主动释放锁读锁主动释放锁

  2. 用法示例

    • 注意:读锁即使调用了到指定时间释放锁的acquire(10, TimeUnit.SECONDS)方法,但是请求线程业务执行完会立即将结果返回给用户,不会阻塞一直等待读锁被自动释放;但是如果写锁调用了到指定时间释放锁的acquire(10, TimeUnit.SECONDS)方法,请求线程即使执行完业务方法但是锁还没有到时间被自动释放掉,请求线程会一直阻塞不响应给用户直到写锁被自动释放,每次都等10s待锁释放以后再将响应结果返回给用户

      • 而且由于写锁会阻塞读锁,写锁没有被自动释放在10s等待期间内,请求线程无法获取到读锁,用户线程也会被阻塞到写锁自动释放以后才能立即获取读锁,执行完业务方法后返回响应结果,此时用户请求线程无需等到读锁到10s自动释放就能响应给客户端

      • 同时读锁也会阻塞写锁,用户线程获取读锁并使用acquire(10, TimeUnit.SECONDS)方法上锁,因为读锁不会因为锁还没到指定时间自动释放锁阻塞用户请求线程,因此获取读锁的用户线程执行完业务方法会立即响应,但是此时读锁并没有释放,还是要等到指定10s时间才会自动释放,如果在获取读锁的用户请求刚到达服务器就紧跟着一个获取写锁的用户请求,获取写锁的用户请求会首先被前一个请求的读锁阻塞10s无法获取写锁,然后还会被写锁到达指定10s时间自动释放写锁再阻塞10s,用户获取写锁的请求线程从请求发出到收到响应一共会被阻塞20s的时间

      • 这是InterProcessReadWriteLock可重入读写锁相对于其他第三方提供的读写锁的一个独特特征,即写锁在释放之前会一直阻塞请求线程,而读锁不会,一定要特别注意

     

InterProcessMultiLock联锁



  1. 常用API

 

 

InterProcessSemaphoreV2信号量



  1. 常用API

    • public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases)

      • 功能解析InterProcessSemaphoreV2的构造方法,传参用户自定义的CuratorFramework对象、父节点路径path和该实例允许的最大租借数量

      • 使用示例InterProcessSemaphoreV2 semaphoreV2 = new InterProcessSemaphoreV2(curatorFramework, "/locks/semaphore", 5)

        • 示例含义:构造父节点路径为/locks/semaphore的分布式信号量,设置最大可允许同时运行的线程数为5

      • 补充说明leases的意思就是租约的意思,表示租赁的契约

    • Lease ---> interProcessSemaphoreV2.acquire()

      • 功能解析:获取运行许可,只有当剩余可允许运行线程数大于0时才能成功获取到许可执行后续代码,否则当前线程就在获取许可处阻塞等待其他线程释放许可,获取成功返回一个租约对象Lease

      • 使用示例Lease acquire = semaphoreV2.acquire()

        • 示例含义:当前线程尝试获取运行许可,并返回获取到的租约对象

    • void ---> interProcessSemaphoreV2.returnLease(Lease lease)

      • 功能解析:释放当前线程占有的运行许可,需要传参当前线程获取许可成功时的返回租约对象lease

      • 使用示例semaphoreV2.returnLease(acquire)

        • 示例含义:当前线程释放已经占有的运行许可

  2. 代码示例

     

共享计数器



SharedCount
  1. 常用API

    • public SharedCount(CuratorFramework client, String path, int seedValue)

      • 功能解析:实例化共享计数器SharedCount对象并指定该对象的父节点路径和计数初始值

      • 使用示例SharedCount sharedCount = new SharedCount(curatorFrameworkClient, "/curator/count", 0);

        • 示例含义:构造父节点路径为/curator/count的共享计数对象并指定初始计数值为0

    • void ---> sharedCount.start()

      • 功能解析:共享计数器手动启动,只有手动启动了的sharedCount对象才能使用完整的功能

      • 使用示例sharedCount.start();

        • 示例含义:手动启动sharedCount对象

    • int ---> sharedCount.getCount()

      • 功能解析:获取共享计数对象中的共享值

      • 使用示例int count = sharedCount.getCount();

        • 示例含义:获取共享计数对象的共享值

    • void ---> sharedCount.setCount(int newCount)

      • 功能解析:修改共享计数对象中的共享值

      • 使用示例sharedCount.setCount(random);

        • 示例含义:将共享计数器SharedCount对象的共享值设定为指定值

    • void ---> sharedCount.close()

      • 功能解析:使用完SharedCount对象需要调用close方法手动进行关闭

      • 使用示例sharedCount.close()

        • 示例含义:手动关闭共享计数器

    • boolean ---> sharedCount.trySetCount(int newCount)

      • 功能解析:当版本号没有变化时,才会更新共享变量的值

      • 使用示例:``

        • 示例含义

      • 补充说明:该方法官方标注已过时

    • void ---> sharedCount.addListener(final SharedCountListener listener)

      • 功能解析:给共享计数对象添加监听器,将来一旦共享值发生变化,监听器可以监听到值的变化

      • 使用示例

        • 示例含义:共享值发生变化时打印被修改后的共享值

      • 补充说明SharedCount的该方法在日常使用中不是特别频繁,原因是另外一个类DistributedAtomicNumber对应的功能更加强大灵活

  2. 用法示例

     

DistributedAtomicNumber


  1. API

    • public DistributedAtomicLong(CuratorFramework client, String counterPath, RetryPolicy retryPolicy)

      • 功能解析:实例化DistributedAtomicLong对象并指定该对象的父节点路径和重试策略

      • 使用示例:``

        • 示例含义

    • AtomicValue<Long> ---> distributedAtomicLong.trySet(Long newValue)

      • 功能解析:尝试给DistributedAtomicLong对象设置计数值

      • 使用示例:``

        • 示例含义

    • boolean ---> distributedAtomicLong.initialize(Long initialize)

      • 功能解析:给DistributedAtomicLong对象的计数值设置初始值

      • 使用示例:``

        • 示例含义

    • AtomicValue<Long> ---> distributedAtomicLong.get()

      • 功能解析:获取DistributedAtomicLong对象的计数值

      • 使用示例:``

        • 示例含义

    • void ---> distributedAtomicLong.forceSet(Long newValue)

      • 功能解析:给DistributedAtomicLong对象的计数值强制设置为新值newValuetrySet()方法可能会设置失败,但是forceSet()方法一定能设置成功

      • 使用示例:``

        • 示例含义

    • AtomicValue<Long> ---> distributedAtomicLong.increment()

      • 功能解析:让DistributedAtomicLong对象的计数值增加1

      • 使用示例:``

        • 示例含义

    • AtomicValue<Long> ---> distributedAtomicLong.decrement()

      • 功能解析:让DistributedAtomicLong对象的计数值减去1

      • 使用示例:``

        • 示例含义

    • AtomicValue<Long> ---> distributedAtomicLong.add(Long delta)

      • 功能解析:让DistributedAtomicLong对象的计数值加上指定值delta

      • 使用示例:``

        • 示例含义

 

基于mysql实现分布式锁

  1. 环境搭建

    • 分布式锁表结构设计参考

      • 设置lock_name字段唯一键索引,其他字段按需自行设计

    • 设计对应实体类

    • 对应的Mapper接口

      • 这是引入MyBatisPlus来简化单表sql情况下的配置

  2. 分布式锁代码实现

    • 这里没有对锁进行封装,这也是非常简陋的版本,没有考虑死锁、重入、续期、公平、正确释放锁、阻塞队列等一系列问题,因为这种实现因为性能问题企业开发不常用

  3. 测试

    • 测试环境:

      • 1️⃣:100个用户线程,1s内每个用户线程发起50次扣减库存请求,总共发起5000次,场景为请求连接nginx负载均衡两个运行实例操作虚拟机上的同一个Redis数据库的同一个共享数据,使用上述基于mysql实现的分布式锁对库存数量5000进行单次扣减1,累计5000次扣减请求

    • 测试结果:

      • 1️⃣:吞吐量194,5000次扣减操作全部成功,Redis中库存数据应从5000减至0,实际是0,没有出现线程安全问题

 

小结

  1. 借助唯一键索引使用插入语句通过成功创建记录竞争锁

  2. 客户端程序获取到锁后宕机来不及释放锁可能导致死锁

    • 可以通过给分布式锁表添加锁的创建时间create_time字段,通过定时调度任务来定时检查锁的生成时间到当前时间的间隔是否超过某个阈值,超过阈值就把对应的锁自动释放掉来避免服务器宕机引起死锁

    • 老师说这种防死锁必须是一个独立的服务来实现该功能才能实现宕机导致的死锁,不能在应用程序客户端里面,估计是不同服务的系统时间有差异,老师没说原因

  3. 不可重入锁也可能导致死锁

    • 可以通过给分布式表添加一个服务器id使用uuid来对服务器进行标识,添加一个线程id字段用来记录是哪一个线程获取到锁,添加一个锁重入计数lock_count字段来记录当前线程的锁重入次数,发生重入就直接去更新对应的所重入计数字段lock_count,否则就去竞争锁

  4. 通过MyBatisPlus的主键回写和通过主键删除记录在不考虑锁续期的情况下当前实现就能防误删

  5. 目前的插入一条记录和删除一条记录都是单句sqlmysql针对这种修改sql会自动开启本地事务,事务的底层也是通过锁实现的,能保证原子性,但是要实现可重入就可能需要使用多条SQL才能完成获取锁或者释放锁的操作,此时可以借助mysql的悲观锁来保证多步操作的原子性

  6. 可重入已经在防死锁中分析过了

  7. 自动续期通过当前服务用定时任务重置锁创建时间即可,锁失效要么锁被释放,要么被防死锁服务自动删除

  8. 单点故障问题可以通过搭建mysql的主从集群来解决,但是mysql的主从集群锁失效的情况和Redis很类似,也是一样存在的;备用服务器还没来得及同步主服务器数据,主服务器就挂掉了,从升级为主丢失原来的锁数据,这个问题也不太好解决

  9. 阻塞锁在mysql中也比较难实现

  10. 受制于数据库性能,基于mysql实现的分布式锁并发能力很有限,这个问题也几乎无法解决

 

总结

  1. 从分布式锁实现难以程度上来说

    • 基于mysql的分布式锁实现最容易,只需要设计一张表使用插入语句和删除语句就能实现分布式锁,而且一般应用程序都会使用到数据库

    • 基于Redis实现的分布式锁稍微麻烦一点,需要使用lua和Reids服务器

    • 基于ZK实现的分布式锁因为Zookeeper客户端的复杂性,在自己实现分布式锁的时候相对比较麻烦,而且还需要Zookeeper服务器,一般还得是集群

  2. 从性能上来说

    • 基于Redis实现的分布式锁性能最好,我们自己实现的吞吐量也能轻松上600,使用Redisson提供的RedissonLock甚至可以接近900

    • 基于ZK实现的分布式锁性能略差,我们自己实现的吞吐量550左右,Curator提供的InterProcessMutex不清楚具体原因吞吐量只有450左右

    • 基于Mysql的实现即使在最简易的实现下也只有190的并发量,还没有考虑各种必要功能实现

  3. 从可靠性方面来说

    • 基于Zookeeper的实现可靠性最高,因为Zookeeper本身就是分布式系统协调组件,Zookeeper集群是强一致性性集群,集群环境下不容易发生锁数据未及时同步导致的锁失效问题

    • 基于Redis和mysql实现的分布式锁在集群环境下可靠性是差不多的,因为都是通过网络IO,将主机操作通过多次IO同步到从机上,一旦主机宕机且锁数据没来得及同步上,就会发生锁失效问题

  4. 根据以上特征要结合项目应用场景的要求来选择合适的方案

    • 比如简单场景,并发量不高的情况下使用基于mysql的实现几行代码就能实现一个分布式锁

    • 追求性能,并发量高就优先选择基于Redis的实现

    • 如果追求可靠性可以考虑基于ZK实现的分布式锁

附录

  1. 预热慢的原因之一是很多实例对象都是懒加载的,第一次运行会去创建这些实例对象,因此第一次运行的速度会慢一些,此外还有JIT即时编译器对部分代码片段的运行优化也会提升系统的运行效率

  2. 老师说不推荐实际工作中通过线程池工具类Executors来初始化线程池,可能会导致内存溢出,Alibaba的开发规范也不推荐使用该工具类来初始化线程池,老师说应该通过构造方法来初始化,关注以下定时调度任务和分布式定时调度任务相关的专题

  3. 不要使用springframeworkStringUtils,老师说这个StringUtils很垃圾,推荐使用commons-lang3StringUtils

  4. 关注java.util.Collections中的Collectiosns.sort(nodes)排序方法和Collections.binarySearch()获取节点下标方法,总之关注一下java.util.Collections类的使用方法

  5. 关注commons-lang3下的StringUtils类中的api用法

  6. 常见锁分类

    • 悲观锁:具有强烈的独占和排他特性,在整个数据处理过程中,将数据处于锁定状态。适合于写比较多,会阻塞读操作。

    • 乐观锁:采取了更加宽松的加锁机制,大多是基于数据版本( Version )及时间戳来实现。。适合于读比较多,不会阻塞读

    • 独占锁、互斥锁、排他锁:保证在任一时刻,只能被一个线程独占排他持有。synchronized、ReentrantLock

    • 共享锁:可同时被多个线程共享持有。CountDownLatch到计数器、Semaphore信号量

    • 可重入锁:又名递归锁。同一个线程在外层方法获取锁的时候,在进入内层方法时会自动获取锁。

    • 不可重入锁:例如早期的synchronized

    • 公平锁:有优先级的锁,先来先得,谁先申请锁就先获取到锁

    • 非公平锁:无优先级的锁,后来者也有机会先获取到锁

    • 自旋锁:当线程尝试获取锁失败时(锁已经被其它线程占用了),无限循环重试尝试获取锁

    • 阻塞锁:当线程尝试获取锁失败时,线程进入阻塞状态,直到接收信号后被唤醒。在竞争激烈情况下,性能较高

    • 读锁:读读并发的共享锁

    • 写锁:读写互斥,写写互斥的独占排他锁

    • 偏向锁:一直被一个线程所访问,那么该线程会自动获取锁

    • 轻量级锁(CAS):当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。

    • 重量级锁:当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候(10次),还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让他申请的线程进入阻塞,性能降低。

    • 表级锁:对整张表加锁,加锁快开销小,不会出现死锁,但并发度低,会增加锁冲突的概率 行级锁:是mysql粒度最小的锁,只针对操作行,可大大减少锁冲突概率,并发度高,但加锁慢,开销大,会出现死锁

  7. Redis提供的各语言对红锁算法实现的框架

    • 通过这可以了解到常用的基于Redis的分布式锁解决方案